HDFS-2686. Merging change 1375800 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1375806 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1ee0c23b39
commit
63810d26ae
|
@ -3,7 +3,6 @@ Hadoop HDFS Change Log
|
||||||
Release 2.0.1-alpha - UNRELEASED
|
Release 2.0.1-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
||||||
HDFS-3446. HostsFileReader silently ignores bad includes/excludes
|
HDFS-3446. HostsFileReader silently ignores bad includes/excludes
|
||||||
(Matthew Jacobs via todd)
|
(Matthew Jacobs via todd)
|
||||||
|
|
||||||
|
@ -501,6 +500,8 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
|
|
||||||
HDFS-3432. TestDFSZKFailoverController tries to fail over too early (todd)
|
HDFS-3432. TestDFSZKFailoverController tries to fail over too early (todd)
|
||||||
|
|
||||||
|
HDFS-2686. Remove DistributedUpgrade related code. (suresh)
|
||||||
|
|
||||||
Release 2.0.0-alpha - 05-23-2012
|
Release 2.0.0-alpha - 05-23-2012
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -695,8 +695,9 @@ public interface ClientProtocol {
|
||||||
public void finalizeUpgrade() throws IOException;
|
public void finalizeUpgrade() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report distributed upgrade progress or force current upgrade to proceed.
|
* <em>Method no longer used - retained only for backward compatibility</em>
|
||||||
*
|
*
|
||||||
|
* Report distributed upgrade progress or force current upgrade to proceed.
|
||||||
* @param action {@link HdfsConstants.UpgradeAction} to perform
|
* @param action {@link HdfsConstants.UpgradeAction} to perform
|
||||||
* @return upgrade status information or null if no upgrades are in progress
|
* @return upgrade status information or null if no upgrades are in progress
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
|
|
@ -389,8 +389,8 @@ public class PBHelper {
|
||||||
public static NamespaceInfo convert(NamespaceInfoProto info) {
|
public static NamespaceInfo convert(NamespaceInfoProto info) {
|
||||||
StorageInfoProto storage = info.getStorageInfo();
|
StorageInfoProto storage = info.getStorageInfo();
|
||||||
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
|
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
|
||||||
info.getBlockPoolID(), storage.getCTime(), info.getDistUpgradeVersion(),
|
info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
|
||||||
info.getBuildVersion(), info.getSoftwareVersion());
|
info.getSoftwareVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
|
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
|
||||||
|
@ -898,7 +898,7 @@ public class PBHelper {
|
||||||
return NamespaceInfoProto.newBuilder()
|
return NamespaceInfoProto.newBuilder()
|
||||||
.setBlockPoolID(info.getBlockPoolID())
|
.setBlockPoolID(info.getBlockPoolID())
|
||||||
.setBuildVersion(info.getBuildVersion())
|
.setBuildVersion(info.getBuildVersion())
|
||||||
.setDistUpgradeVersion(info.getDistributedUpgradeVersion())
|
.setUnused(0)
|
||||||
.setStorageInfo(PBHelper.convert((StorageInfo)info))
|
.setStorageInfo(PBHelper.convert((StorageInfo)info))
|
||||||
.setSoftwareVersion(info.getSoftwareVersion()).build();
|
.setSoftwareVersion(info.getSoftwareVersion()).build();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,91 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.common;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.SortedSet;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Generic upgrade manager.
|
|
||||||
*
|
|
||||||
* {@link #broadcastCommand} is the command that should be
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public abstract class UpgradeManager {
|
|
||||||
protected SortedSet<Upgradeable> currentUpgrades = null;
|
|
||||||
protected boolean upgradeState = false; // true if upgrade is in progress
|
|
||||||
protected int upgradeVersion = 0;
|
|
||||||
protected UpgradeCommand broadcastCommand = null;
|
|
||||||
|
|
||||||
public synchronized UpgradeCommand getBroadcastCommand() {
|
|
||||||
return this.broadcastCommand;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean getUpgradeState() {
|
|
||||||
return this.upgradeState;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized int getUpgradeVersion(){
|
|
||||||
return this.upgradeVersion;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void setUpgradeState(boolean uState, int uVersion) {
|
|
||||||
this.upgradeState = uState;
|
|
||||||
this.upgradeVersion = uVersion;
|
|
||||||
}
|
|
||||||
|
|
||||||
public SortedSet<Upgradeable> getDistributedUpgrades() throws IOException {
|
|
||||||
return UpgradeObjectCollection.getDistributedUpgrades(
|
|
||||||
getUpgradeVersion(), getType());
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized short getUpgradeStatus() {
|
|
||||||
if(currentUpgrades == null)
|
|
||||||
return 100;
|
|
||||||
return currentUpgrades.first().getUpgradeStatus();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean initializeUpgrade() throws IOException {
|
|
||||||
currentUpgrades = getDistributedUpgrades();
|
|
||||||
if(currentUpgrades == null) {
|
|
||||||
// set new upgrade state
|
|
||||||
setUpgradeState(false, HdfsConstants.LAYOUT_VERSION);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
Upgradeable curUO = currentUpgrades.first();
|
|
||||||
// set and write new upgrade state into disk
|
|
||||||
setUpgradeState(true, curUO.getVersion());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean isUpgradeCompleted() {
|
|
||||||
if (currentUpgrades == null) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public abstract HdfsServerConstants.NodeType getType();
|
|
||||||
public abstract boolean startUpgrade() throws IOException;
|
|
||||||
public abstract void completeUpgrade() throws IOException;
|
|
||||||
}
|
|
|
@ -1,74 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.common;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeObjectCollection.UOSignature;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Abstract upgrade object.
|
|
||||||
*
|
|
||||||
* Contains default implementation of common methods of {@link Upgradeable}
|
|
||||||
* interface.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public abstract class UpgradeObject implements Upgradeable {
|
|
||||||
protected short status;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public short getUpgradeStatus() {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDescription() {
|
|
||||||
return "Upgrade object for " + getType() + " layout version " + getVersion();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public UpgradeStatusReport getUpgradeStatusReport(boolean details)
|
|
||||||
throws IOException {
|
|
||||||
return new UpgradeStatusReport(getVersion(), getUpgradeStatus(), false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(Upgradeable o) {
|
|
||||||
if(this.getVersion() != o.getVersion())
|
|
||||||
return (getVersion() > o.getVersion() ? -1 : 1);
|
|
||||||
int res = this.getType().toString().compareTo(o.getType().toString());
|
|
||||||
if(res != 0)
|
|
||||||
return res;
|
|
||||||
return getClass().getCanonicalName().compareTo(
|
|
||||||
o.getClass().getCanonicalName());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (!(o instanceof UpgradeObject)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return this.compareTo((UpgradeObject)o) == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return new UOSignature(this).hashCode();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,135 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.common;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.SortedSet;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Collection of upgrade objects.
|
|
||||||
*
|
|
||||||
* Upgrade objects should be registered here before they can be used.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class UpgradeObjectCollection {
|
|
||||||
static {
|
|
||||||
initialize();
|
|
||||||
// Registered distributed upgrade objects here
|
|
||||||
// registerUpgrade(new UpgradeObject());
|
|
||||||
}
|
|
||||||
|
|
||||||
static class UOSignature implements Comparable<UOSignature> {
|
|
||||||
int version;
|
|
||||||
HdfsServerConstants.NodeType type;
|
|
||||||
String className;
|
|
||||||
|
|
||||||
UOSignature(Upgradeable uo) {
|
|
||||||
this.version = uo.getVersion();
|
|
||||||
this.type = uo.getType();
|
|
||||||
this.className = uo.getClass().getCanonicalName();
|
|
||||||
}
|
|
||||||
|
|
||||||
int getVersion() {
|
|
||||||
return version;
|
|
||||||
}
|
|
||||||
|
|
||||||
HdfsServerConstants.NodeType getType() {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
String getClassName() {
|
|
||||||
return className;
|
|
||||||
}
|
|
||||||
|
|
||||||
Upgradeable instantiate() throws IOException {
|
|
||||||
try {
|
|
||||||
return (Upgradeable)Class.forName(getClassName()).newInstance();
|
|
||||||
} catch(ClassNotFoundException e) {
|
|
||||||
throw new IOException(StringUtils.stringifyException(e));
|
|
||||||
} catch(InstantiationException e) {
|
|
||||||
throw new IOException(StringUtils.stringifyException(e));
|
|
||||||
} catch(IllegalAccessException e) {
|
|
||||||
throw new IOException(StringUtils.stringifyException(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(UOSignature o) {
|
|
||||||
if(this.version != o.version)
|
|
||||||
return (version < o.version ? -1 : 1);
|
|
||||||
int res = this.getType().toString().compareTo(o.getType().toString());
|
|
||||||
if(res != 0)
|
|
||||||
return res;
|
|
||||||
return className.compareTo(o.className);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (!(o instanceof UOSignature)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return this.compareTo((UOSignature)o) == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return version ^ ((type==null)?0:type.hashCode())
|
|
||||||
^ ((className==null)?0:className.hashCode());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Static collection of upgrade objects sorted by version.
|
|
||||||
* Layout versions are negative therefore newer versions will go first.
|
|
||||||
*/
|
|
||||||
static SortedSet<UOSignature> upgradeTable;
|
|
||||||
|
|
||||||
static final void initialize() {
|
|
||||||
upgradeTable = new TreeSet<UOSignature>();
|
|
||||||
}
|
|
||||||
|
|
||||||
static void registerUpgrade(Upgradeable uo) {
|
|
||||||
// Registered distributed upgrade objects here
|
|
||||||
upgradeTable.add(new UOSignature(uo));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static SortedSet<Upgradeable> getDistributedUpgrades(int versionFrom,
|
|
||||||
HdfsServerConstants.NodeType type
|
|
||||||
) throws IOException {
|
|
||||||
assert HdfsConstants.LAYOUT_VERSION <= versionFrom : "Incorrect version "
|
|
||||||
+ versionFrom + ". Expected to be <= " + HdfsConstants.LAYOUT_VERSION;
|
|
||||||
SortedSet<Upgradeable> upgradeObjects = new TreeSet<Upgradeable>();
|
|
||||||
for(UOSignature sig : upgradeTable) {
|
|
||||||
if(sig.getVersion() < HdfsConstants.LAYOUT_VERSION)
|
|
||||||
continue;
|
|
||||||
if(sig.getVersion() > versionFrom)
|
|
||||||
break;
|
|
||||||
if(sig.getType() != type )
|
|
||||||
continue;
|
|
||||||
upgradeObjects.add(sig.instantiate());
|
|
||||||
}
|
|
||||||
if(upgradeObjects.size() == 0)
|
|
||||||
return null;
|
|
||||||
return upgradeObjects;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,100 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.common;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Common interface for distributed upgrade objects.
|
|
||||||
*
|
|
||||||
* Each upgrade object corresponds to a layout version,
|
|
||||||
* which is the latest version that should be upgraded using this object.
|
|
||||||
* That is all components whose layout version is greater or equal to the
|
|
||||||
* one returned by {@link #getVersion()} must be upgraded with this object.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public interface Upgradeable extends Comparable<Upgradeable> {
|
|
||||||
/**
|
|
||||||
* Get the layout version of the upgrade object.
|
|
||||||
* @return layout version
|
|
||||||
*/
|
|
||||||
int getVersion();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the type of the software component, which this object is upgrading.
|
|
||||||
* @return type
|
|
||||||
*/
|
|
||||||
HdfsServerConstants.NodeType getType();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Description of the upgrade object for displaying.
|
|
||||||
* @return description
|
|
||||||
*/
|
|
||||||
String getDescription();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Upgrade status determines a percentage of the work done out of the total
|
|
||||||
* amount required by the upgrade.
|
|
||||||
*
|
|
||||||
* 100% means that the upgrade is completed.
|
|
||||||
* Any value < 100 means it is not complete.
|
|
||||||
*
|
|
||||||
* The return value should provide at least 2 values, e.g. 0 and 100.
|
|
||||||
* @return integer value in the range [0, 100].
|
|
||||||
*/
|
|
||||||
short getUpgradeStatus();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Prepare for the upgrade.
|
|
||||||
* E.g. initialize upgrade data structures and set status to 0.
|
|
||||||
*
|
|
||||||
* Returns an upgrade command that is used for broadcasting to other cluster
|
|
||||||
* components.
|
|
||||||
* E.g. name-node informs data-nodes that they must perform a distributed upgrade.
|
|
||||||
*
|
|
||||||
* @return an UpgradeCommand for broadcasting.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
UpgradeCommand startUpgrade() throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Complete upgrade.
|
|
||||||
* E.g. cleanup upgrade data structures or write metadata to disk.
|
|
||||||
*
|
|
||||||
* Returns an upgrade command that is used for broadcasting to other cluster
|
|
||||||
* components.
|
|
||||||
* E.g. data-nodes inform the name-node that they completed the upgrade
|
|
||||||
* while other data-nodes are still upgrading.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
UpgradeCommand completeUpgrade() throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get status report for the upgrade.
|
|
||||||
*
|
|
||||||
* @param details true if upgradeStatus details need to be included,
|
|
||||||
* false otherwise
|
|
||||||
* @return {@link UpgradeStatusReport}
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
UpgradeStatusReport getUpgradeStatusReport(boolean details) throws IOException;
|
|
||||||
}
|
|
|
@ -74,7 +74,6 @@ class BPOfferService {
|
||||||
*/
|
*/
|
||||||
DatanodeRegistration bpRegistration;
|
DatanodeRegistration bpRegistration;
|
||||||
|
|
||||||
UpgradeManagerDatanode upgradeManager = null;
|
|
||||||
private final DataNode dn;
|
private final DataNode dn;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -260,33 +259,6 @@ class BPOfferService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized UpgradeManagerDatanode getUpgradeManager() {
|
|
||||||
if(upgradeManager == null)
|
|
||||||
upgradeManager =
|
|
||||||
new UpgradeManagerDatanode(dn, getBlockPoolId());
|
|
||||||
|
|
||||||
return upgradeManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
void processDistributedUpgradeCommand(UpgradeCommand comm)
|
|
||||||
throws IOException {
|
|
||||||
UpgradeManagerDatanode upgradeManager = getUpgradeManager();
|
|
||||||
upgradeManager.processUpgradeCommand(comm);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start distributed upgrade if it should be initiated by the data-node.
|
|
||||||
*/
|
|
||||||
synchronized void startDistributedUpgradeIfNeeded() throws IOException {
|
|
||||||
UpgradeManagerDatanode um = getUpgradeManager();
|
|
||||||
|
|
||||||
if(!um.getUpgradeState())
|
|
||||||
return;
|
|
||||||
um.setUpgradeState(false, um.getUpgradeVersion());
|
|
||||||
um.startUpgrade();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
DataNode getDataNode() {
|
DataNode getDataNode() {
|
||||||
return dn;
|
return dn;
|
||||||
}
|
}
|
||||||
|
@ -374,9 +346,6 @@ class BPOfferService {
|
||||||
|
|
||||||
if (bpServices.isEmpty()) {
|
if (bpServices.isEmpty()) {
|
||||||
dn.shutdownBlockPool(this);
|
dn.shutdownBlockPool(this);
|
||||||
|
|
||||||
if(upgradeManager != null)
|
|
||||||
upgradeManager.shutdownUpgrade();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -594,7 +563,7 @@ class BPOfferService {
|
||||||
break;
|
break;
|
||||||
case UpgradeCommand.UC_ACTION_START_UPGRADE:
|
case UpgradeCommand.UC_ACTION_START_UPGRADE:
|
||||||
// start distributed upgrade here
|
// start distributed upgrade here
|
||||||
processDistributedUpgradeCommand((UpgradeCommand)cmd);
|
LOG.warn("Distibuted upgrade is no longer supported");
|
||||||
break;
|
break;
|
||||||
case DatanodeProtocol.DNA_RECOVERBLOCK:
|
case DatanodeProtocol.DNA_RECOVERBLOCK:
|
||||||
String who = "NameNode at " + actor.getNNSocketAddress();
|
String who = "NameNode at " + actor.getNNSocketAddress();
|
||||||
|
|
|
@ -324,7 +324,7 @@ class BPServiceActor implements Runnable {
|
||||||
* Run an immediate block report on this thread. Used by tests.
|
* Run an immediate block report on this thread. Used by tests.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void triggerBlockReportForTests() throws IOException {
|
void triggerBlockReportForTests() {
|
||||||
synchronized (pendingIncrementalBR) {
|
synchronized (pendingIncrementalBR) {
|
||||||
lastBlockReport = 0;
|
lastBlockReport = 0;
|
||||||
lastHeartbeat = 0;
|
lastHeartbeat = 0;
|
||||||
|
@ -340,7 +340,7 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void triggerHeartbeatForTests() throws IOException {
|
void triggerHeartbeatForTests() {
|
||||||
synchronized (pendingIncrementalBR) {
|
synchronized (pendingIncrementalBR) {
|
||||||
lastHeartbeat = 0;
|
lastHeartbeat = 0;
|
||||||
pendingIncrementalBR.notifyAll();
|
pendingIncrementalBR.notifyAll();
|
||||||
|
@ -355,7 +355,7 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void triggerDeletionReportForTests() throws IOException {
|
void triggerDeletionReportForTests() {
|
||||||
synchronized (pendingIncrementalBR) {
|
synchronized (pendingIncrementalBR) {
|
||||||
lastDeletedReport = 0;
|
lastDeletedReport = 0;
|
||||||
pendingIncrementalBR.notifyAll();
|
pendingIncrementalBR.notifyAll();
|
||||||
|
@ -670,7 +670,6 @@ class BPServiceActor implements Runnable {
|
||||||
|
|
||||||
while (shouldRun()) {
|
while (shouldRun()) {
|
||||||
try {
|
try {
|
||||||
bpos.startDistributedUpgradeIfNeeded();
|
|
||||||
offerService();
|
offerService();
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOG.error("Exception in BPOfferService for " + this, ex);
|
LOG.error("Exception in BPOfferService for " + this, ex);
|
||||||
|
|
|
@ -138,7 +138,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
// During startup some of them can upgrade or roll back
|
// During startup some of them can upgrade or roll back
|
||||||
// while others could be up-to-date for the regular startup.
|
// while others could be up-to-date for the regular startup.
|
||||||
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
||||||
doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
|
doTransition(getStorageDir(idx), nsInfo, startOpt);
|
||||||
assert getLayoutVersion() == nsInfo.getLayoutVersion()
|
assert getLayoutVersion() == nsInfo.getLayoutVersion()
|
||||||
: "Data-node and name-node layout versions must be the same.";
|
: "Data-node and name-node layout versions must be the same.";
|
||||||
assert getCTime() == nsInfo.getCTime()
|
assert getCTime() == nsInfo.getCTime()
|
||||||
|
@ -232,7 +232,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* @param startOpt startup option
|
* @param startOpt startup option
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void doTransition(DataNode datanode, StorageDirectory sd,
|
private void doTransition(StorageDirectory sd,
|
||||||
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
|
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
|
||||||
if (startOpt == StartupOption.ROLLBACK)
|
if (startOpt == StartupOption.ROLLBACK)
|
||||||
doRollback(sd, nsInfo); // rollback if applicable
|
doRollback(sd, nsInfo); // rollback if applicable
|
||||||
|
@ -254,13 +254,9 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
+ blockpoolID);
|
+ blockpoolID);
|
||||||
}
|
}
|
||||||
if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION
|
if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION
|
||||||
&& this.cTime == nsInfo.getCTime())
|
&& this.cTime == nsInfo.getCTime()) {
|
||||||
return; // regular startup
|
return; // regular startup
|
||||||
|
}
|
||||||
// verify necessity of a distributed upgrade
|
|
||||||
UpgradeManagerDatanode um =
|
|
||||||
datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
|
|
||||||
verifyDistributedUpgradeProgress(um, nsInfo);
|
|
||||||
if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
|
if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
|
||||||
|| this.cTime < nsInfo.getCTime()) {
|
|| this.cTime < nsInfo.getCTime()) {
|
||||||
doUpgrade(sd, nsInfo); // upgrade
|
doUpgrade(sd, nsInfo); // upgrade
|
||||||
|
@ -476,13 +472,6 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
LOG.info( hardLink.linkStats.report() );
|
LOG.info( hardLink.linkStats.report() );
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
|
|
||||||
NamespaceInfo nsInfo) throws IOException {
|
|
||||||
assert um != null : "DataNode.upgradeManager is null.";
|
|
||||||
um.setUpgradeState(false, getLayoutVersion());
|
|
||||||
um.initializeUpgrade(nsInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* gets the data node storage directory based on block pool storage
|
* gets the data node storage directory based on block pool storage
|
||||||
*
|
*
|
||||||
|
|
|
@ -99,13 +99,8 @@ public class DataBlockScanner implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for at least one block pool to be up
|
// Wait for at least one block pool to be up
|
||||||
private void waitForInit(String bpid) {
|
private void waitForInit() {
|
||||||
UpgradeManagerDatanode um = null;
|
while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
|
||||||
if(bpid != null && !bpid.equals(""))
|
|
||||||
um = datanode.getUpgradeManagerDatanode(bpid);
|
|
||||||
|
|
||||||
while ((um != null && ! um.isUpgradeCompleted())
|
|
||||||
|| (getBlockPoolSetSize() < datanode.getAllBpOs().length)
|
|
||||||
|| (getBlockPoolSetSize() < 1)) {
|
|| (getBlockPoolSetSize() < 1)) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
|
@ -129,7 +124,7 @@ public class DataBlockScanner implements Runnable {
|
||||||
String nextBpId = null;
|
String nextBpId = null;
|
||||||
while ((nextBpId == null) && datanode.shouldRun
|
while ((nextBpId == null) && datanode.shouldRun
|
||||||
&& !blockScannerThread.isInterrupted()) {
|
&& !blockScannerThread.isInterrupted()) {
|
||||||
waitForInit(currentBpId);
|
waitForInit();
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (getBlockPoolSetSize() > 0) {
|
if (getBlockPoolSetSize() > 0) {
|
||||||
// Find nextBpId by the minimum of the last scan time
|
// Find nextBpId by the minimum of the last scan time
|
||||||
|
|
|
@ -507,7 +507,7 @@ public class DataNode extends Configured
|
||||||
reason = "verifcation is not supported by SimulatedFSDataset";
|
reason = "verifcation is not supported by SimulatedFSDataset";
|
||||||
}
|
}
|
||||||
if (reason == null) {
|
if (reason == null) {
|
||||||
directoryScanner = new DirectoryScanner(this, data, conf);
|
directoryScanner = new DirectoryScanner(data, conf);
|
||||||
directoryScanner.start();
|
directoryScanner.start();
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Periodic Directory Tree Verification scan is disabled because " +
|
LOG.info("Periodic Directory Tree Verification scan is disabled because " +
|
||||||
|
@ -1223,17 +1223,8 @@ public class DataNode extends Configured
|
||||||
return xmitsInProgress.get();
|
return xmitsInProgress.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
|
private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
|
||||||
BPOfferService bpos = blockPoolManager.get(bpid);
|
throws IOException {
|
||||||
if(bpos==null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return bpos.getUpgradeManager();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void transferBlock( ExtendedBlock block,
|
|
||||||
DatanodeInfo xferTargets[]
|
|
||||||
) throws IOException {
|
|
||||||
BPOfferService bpos = getBPOSForBlock(block);
|
BPOfferService bpos = getBPOSForBlock(block);
|
||||||
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
|
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
|
||||||
|
|
||||||
|
@ -1871,8 +1862,7 @@ public class DataNode extends Configured
|
||||||
private void recoverBlock(RecoveringBlock rBlock) throws IOException {
|
private void recoverBlock(RecoveringBlock rBlock) throws IOException {
|
||||||
ExtendedBlock block = rBlock.getBlock();
|
ExtendedBlock block = rBlock.getBlock();
|
||||||
String blookPoolId = block.getBlockPoolId();
|
String blookPoolId = block.getBlockPoolId();
|
||||||
DatanodeInfo[] targets = rBlock.getLocations();
|
DatanodeID[] datanodeids = rBlock.getLocations();
|
||||||
DatanodeID[] datanodeids = (DatanodeID[])targets;
|
|
||||||
List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
|
List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
|
||||||
int errorCount = 0;
|
int errorCount = 0;
|
||||||
|
|
||||||
|
|
|
@ -396,10 +396,6 @@ public class DataStorage extends Storage {
|
||||||
if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION
|
if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION
|
||||||
&& this.cTime == nsInfo.getCTime())
|
&& this.cTime == nsInfo.getCTime())
|
||||||
return; // regular startup
|
return; // regular startup
|
||||||
// verify necessity of a distributed upgrade
|
|
||||||
UpgradeManagerDatanode um =
|
|
||||||
datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
|
|
||||||
verifyDistributedUpgradeProgress(um, nsInfo);
|
|
||||||
|
|
||||||
// do upgrade
|
// do upgrade
|
||||||
if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
|
if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
|
||||||
|
@ -708,14 +704,6 @@ public class DataStorage extends Storage {
|
||||||
new File(to, otherNames[i]), oldLV, hl);
|
new File(to, otherNames[i]), oldLV, hl);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
|
|
||||||
NamespaceInfo nsInfo
|
|
||||||
) throws IOException {
|
|
||||||
assert um != null : "DataNode.upgradeManager is null.";
|
|
||||||
um.setUpgradeState(false, getLayoutVersion());
|
|
||||||
um.initializeUpgrade(nsInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add bpStorage into bpStorageMap
|
* Add bpStorage into bpStorageMap
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -56,7 +56,6 @@ import org.apache.hadoop.util.Time;
|
||||||
public class DirectoryScanner implements Runnable {
|
public class DirectoryScanner implements Runnable {
|
||||||
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
|
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
|
||||||
|
|
||||||
private final DataNode datanode;
|
|
||||||
private final FsDatasetSpi<?> dataset;
|
private final FsDatasetSpi<?> dataset;
|
||||||
private final ExecutorService reportCompileThreadPool;
|
private final ExecutorService reportCompileThreadPool;
|
||||||
private final ScheduledExecutorService masterThread;
|
private final ScheduledExecutorService masterThread;
|
||||||
|
@ -222,8 +221,7 @@ public class DirectoryScanner implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DirectoryScanner(DataNode dn, FsDatasetSpi<?> dataset, Configuration conf) {
|
DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
|
||||||
this.datanode = dn;
|
|
||||||
this.dataset = dataset;
|
this.dataset = dataset;
|
||||||
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
|
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
|
||||||
|
@ -271,17 +269,6 @@ public class DirectoryScanner implements Runnable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String[] bpids = dataset.getBlockPoolList();
|
|
||||||
for(String bpid : bpids) {
|
|
||||||
UpgradeManagerDatanode um =
|
|
||||||
datanode.getUpgradeManagerDatanode(bpid);
|
|
||||||
if (um != null && !um.isUpgradeCompleted()) {
|
|
||||||
//If distributed upgrades underway, exit and wait for next cycle.
|
|
||||||
LOG.warn("this cycle terminating immediately because Distributed Upgrade is in process");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//We're are okay to run - do it
|
//We're are okay to run - do it
|
||||||
reconcile();
|
reconcile();
|
||||||
|
|
||||||
|
|
|
@ -1,158 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
||||||
import org.apache.hadoop.util.Daemon;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Upgrade manager for data-nodes.
|
|
||||||
*
|
|
||||||
* Distributed upgrades for a data-node are performed in a separate thread.
|
|
||||||
* The upgrade starts when the data-node receives the start upgrade command
|
|
||||||
* from the namenode. At that point the manager finds a respective upgrade
|
|
||||||
* object and starts a daemon in order to perform the upgrade defined by the
|
|
||||||
* object.
|
|
||||||
*/
|
|
||||||
class UpgradeManagerDatanode extends UpgradeManager {
|
|
||||||
DataNode dataNode = null;
|
|
||||||
Daemon upgradeDaemon = null;
|
|
||||||
String bpid = null;
|
|
||||||
|
|
||||||
UpgradeManagerDatanode(DataNode dataNode, String bpid) {
|
|
||||||
super();
|
|
||||||
this.dataNode = dataNode;
|
|
||||||
this.bpid = bpid;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HdfsServerConstants.NodeType getType() {
|
|
||||||
return HdfsServerConstants.NodeType.DATA_NODE;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized void initializeUpgrade(NamespaceInfo nsInfo) throws IOException {
|
|
||||||
if( ! super.initializeUpgrade())
|
|
||||||
return; // distr upgrade is not needed
|
|
||||||
DataNode.LOG.info("\n Distributed upgrade for DataNode "
|
|
||||||
+ dataNode.getDisplayName()
|
|
||||||
+ " version " + getUpgradeVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " is initialized.");
|
|
||||||
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
|
|
||||||
curUO.setDatanode(dataNode, this.bpid);
|
|
||||||
upgradeState = curUO.preUpgradeAction(nsInfo);
|
|
||||||
// upgradeState is true if the data-node should start the upgrade itself
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start distributed upgrade.
|
|
||||||
* Instantiates distributed upgrade objects.
|
|
||||||
*
|
|
||||||
* @return true if distributed upgrade is required or false otherwise
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public synchronized boolean startUpgrade() throws IOException {
|
|
||||||
if(upgradeState) { // upgrade is already in progress
|
|
||||||
assert currentUpgrades != null :
|
|
||||||
"UpgradeManagerDatanode.currentUpgrades is null.";
|
|
||||||
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
|
|
||||||
curUO.startUpgrade();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if(broadcastCommand != null) {
|
|
||||||
if(broadcastCommand.getVersion() > this.getUpgradeVersion()) {
|
|
||||||
// stop broadcasting, the cluster moved on
|
|
||||||
// start upgrade for the next version
|
|
||||||
broadcastCommand = null;
|
|
||||||
} else {
|
|
||||||
// the upgrade has been finished by this data-node,
|
|
||||||
// but the cluster is still running it,
|
|
||||||
// reply with the broadcast command
|
|
||||||
assert currentUpgrades == null :
|
|
||||||
"UpgradeManagerDatanode.currentUpgrades is not null.";
|
|
||||||
assert upgradeDaemon == null :
|
|
||||||
"UpgradeManagerDatanode.upgradeDaemon is not null.";
|
|
||||||
DatanodeProtocol nn = dataNode.getActiveNamenodeForBP(bpid);
|
|
||||||
nn.processUpgradeCommand(broadcastCommand);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(currentUpgrades == null)
|
|
||||||
currentUpgrades = getDistributedUpgrades();
|
|
||||||
if(currentUpgrades == null) {
|
|
||||||
DataNode.LOG.info("\n Distributed upgrade for DataNode version "
|
|
||||||
+ getUpgradeVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " cannot be started. "
|
|
||||||
+ "The upgrade object is not defined.");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
upgradeState = true;
|
|
||||||
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
|
|
||||||
curUO.setDatanode(dataNode, this.bpid);
|
|
||||||
curUO.startUpgrade();
|
|
||||||
upgradeDaemon = new Daemon(curUO);
|
|
||||||
upgradeDaemon.start();
|
|
||||||
DataNode.LOG.info("\n Distributed upgrade for DataNode "
|
|
||||||
+ dataNode.getDisplayName()
|
|
||||||
+ " version " + getUpgradeVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " is started.");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized void processUpgradeCommand(UpgradeCommand command
|
|
||||||
) throws IOException {
|
|
||||||
assert command.getAction() == UpgradeCommand.UC_ACTION_START_UPGRADE :
|
|
||||||
"Only start upgrade action can be processed at this time.";
|
|
||||||
this.upgradeVersion = command.getVersion();
|
|
||||||
// Start distributed upgrade
|
|
||||||
if(startUpgrade()) // upgrade started
|
|
||||||
return;
|
|
||||||
throw new IOException(
|
|
||||||
"Distributed upgrade for DataNode " + dataNode.getDisplayName()
|
|
||||||
+ " version " + getUpgradeVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " cannot be started. "
|
|
||||||
+ "The upgrade object is not defined.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void completeUpgrade() throws IOException {
|
|
||||||
assert currentUpgrades != null :
|
|
||||||
"UpgradeManagerDatanode.currentUpgrades is null.";
|
|
||||||
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
|
|
||||||
broadcastCommand = curUO.completeUpgrade();
|
|
||||||
upgradeState = false;
|
|
||||||
currentUpgrades = null;
|
|
||||||
upgradeDaemon = null;
|
|
||||||
DataNode.LOG.info("\n Distributed upgrade for DataNode "
|
|
||||||
+ dataNode.getDisplayName()
|
|
||||||
+ " version " + getUpgradeVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " is complete.");
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized void shutdownUpgrade() {
|
|
||||||
if(upgradeDaemon != null)
|
|
||||||
upgradeDaemon.interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,142 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeObject;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.SocketTimeoutException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Base class for data-node upgrade objects.
|
|
||||||
* Data-node upgrades are run in separate threads.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public abstract class UpgradeObjectDatanode extends UpgradeObject implements Runnable {
|
|
||||||
private DataNode dataNode = null;
|
|
||||||
private String bpid = null;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HdfsServerConstants.NodeType getType() {
|
|
||||||
return HdfsServerConstants.NodeType.DATA_NODE;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected DataNode getDatanode() {
|
|
||||||
return dataNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected DatanodeProtocol getNamenode() throws IOException {
|
|
||||||
return dataNode.getActiveNamenodeForBP(bpid);
|
|
||||||
}
|
|
||||||
|
|
||||||
void setDatanode(DataNode dataNode, String bpid) {
|
|
||||||
this.dataNode = dataNode;
|
|
||||||
this.bpid = bpid;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Specifies how the upgrade is performed.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public abstract void doUpgrade() throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Specifies what to do before the upgrade is started.
|
|
||||||
*
|
|
||||||
* The default implementation checks whether the data-node missed the upgrade
|
|
||||||
* and throws an exception if it did. This leads to the data-node shutdown.
|
|
||||||
*
|
|
||||||
* Data-nodes usually start distributed upgrade when the name-node replies
|
|
||||||
* to its heartbeat with a start upgrade command.
|
|
||||||
* Sometimes though, e.g. when a data-node missed the upgrade and wants to
|
|
||||||
* catchup with the rest of the cluster, it is necessary to initiate the
|
|
||||||
* upgrade directly on the data-node, since the name-node might not ever
|
|
||||||
* start it. An override of this method should then return true.
|
|
||||||
* And the upgrade will start after data-ndoe registration but before sending
|
|
||||||
* its first heartbeat.
|
|
||||||
*
|
|
||||||
* @param nsInfo name-node versions, verify that the upgrade
|
|
||||||
* object can talk to this name-node version if necessary.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
* @return true if data-node itself should start the upgrade or
|
|
||||||
* false if it should wait until the name-node starts the upgrade.
|
|
||||||
*/
|
|
||||||
boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
|
|
||||||
int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
|
|
||||||
if(nsUpgradeVersion >= getVersion())
|
|
||||||
return false; // name-node will perform the upgrade
|
|
||||||
// Missed the upgrade. Report problem to the name-node and throw exception
|
|
||||||
String errorMsg =
|
|
||||||
"\n Data-node missed a distributed upgrade and will shutdown."
|
|
||||||
+ "\n " + getDescription() + "."
|
|
||||||
+ " Name-node version = " + nsInfo.getLayoutVersion() + ".";
|
|
||||||
DataNode.LOG.fatal( errorMsg );
|
|
||||||
String bpid = nsInfo.getBlockPoolID();
|
|
||||||
dataNode.trySendErrorReport(bpid, DatanodeProtocol.NOTIFY, errorMsg);
|
|
||||||
throw new IOException(errorMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
assert dataNode != null : "UpgradeObjectDatanode.dataNode is null";
|
|
||||||
while(dataNode.shouldRun) {
|
|
||||||
try {
|
|
||||||
doUpgrade();
|
|
||||||
} catch(Exception e) {
|
|
||||||
DataNode.LOG.error("Exception in doUpgrade", e);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// report results
|
|
||||||
if(getUpgradeStatus() < 100) {
|
|
||||||
DataNode.LOG.info("\n Distributed upgrade for DataNode version "
|
|
||||||
+ getVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " cannot be completed.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Complete the upgrade by calling the manager method
|
|
||||||
try {
|
|
||||||
UpgradeManagerDatanode upgradeManager =
|
|
||||||
dataNode.getUpgradeManagerDatanode(bpid);
|
|
||||||
if(upgradeManager != null)
|
|
||||||
upgradeManager.completeUpgrade();
|
|
||||||
} catch(IOException e) {
|
|
||||||
DataNode.LOG.error("Exception in completeUpgrade", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Complete upgrade and return a status complete command for broadcasting.
|
|
||||||
*
|
|
||||||
* Data-nodes finish upgrade at different times.
|
|
||||||
* The data-node needs to re-confirm with the name-node that the upgrade
|
|
||||||
* is complete while other nodes are still upgrading.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public UpgradeCommand completeUpgrade() throws IOException {
|
|
||||||
return new UpgradeCommand(UpgradeCommand.UC_ACTION_REPORT_STATUS,
|
|
||||||
getVersion(), (short)100);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -94,8 +94,6 @@ public class FSImage implements Closeable {
|
||||||
/**
|
/**
|
||||||
* Construct an FSImage
|
* Construct an FSImage
|
||||||
* @param conf Configuration
|
* @param conf Configuration
|
||||||
* @see #FSImage(Configuration conf,
|
|
||||||
* Collection imageDirs, Collection editsDirs)
|
|
||||||
* @throws IOException if default directories are invalid.
|
* @throws IOException if default directories are invalid.
|
||||||
*/
|
*/
|
||||||
public FSImage(Configuration conf) throws IOException {
|
public FSImage(Configuration conf) throws IOException {
|
||||||
|
@ -172,8 +170,6 @@ public class FSImage implements Closeable {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"All specified directories are not accessible or do not exist.");
|
"All specified directories are not accessible or do not exist.");
|
||||||
|
|
||||||
storage.setUpgradeManager(target.upgradeManager);
|
|
||||||
|
|
||||||
// 1. For each data directory calculate its state and
|
// 1. For each data directory calculate its state and
|
||||||
// check whether all is consistent before transitioning.
|
// check whether all is consistent before transitioning.
|
||||||
Map<StorageDirectory, StorageState> dataDirStates =
|
Map<StorageDirectory, StorageState> dataDirStates =
|
||||||
|
@ -208,9 +204,6 @@ public class FSImage implements Closeable {
|
||||||
|
|
||||||
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
|
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
|
||||||
|
|
||||||
// check whether distributed upgrade is required and/or should be continued
|
|
||||||
storage.verifyDistributedUpgradeProgress(startOpt);
|
|
||||||
|
|
||||||
// 2. Format unformatted dirs.
|
// 2. Format unformatted dirs.
|
||||||
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
|
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
|
||||||
StorageDirectory sd = it.next();
|
StorageDirectory sd = it.next();
|
||||||
|
@ -301,13 +294,6 @@ public class FSImage implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doUpgrade(FSNamesystem target) throws IOException {
|
private void doUpgrade(FSNamesystem target) throws IOException {
|
||||||
if(storage.getDistributedUpgradeState()) {
|
|
||||||
// only distributed upgrade need to continue
|
|
||||||
// don't do version upgrade
|
|
||||||
this.loadFSImage(target, null);
|
|
||||||
storage.initializeDistributedUpgrade();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// Upgrade is allowed only if there are
|
// Upgrade is allowed only if there are
|
||||||
// no previous fs states in any of the directories
|
// no previous fs states in any of the directories
|
||||||
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
|
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
|
||||||
|
@ -390,7 +376,6 @@ public class FSImage implements Closeable {
|
||||||
+ storage.getRemovedStorageDirs().size()
|
+ storage.getRemovedStorageDirs().size()
|
||||||
+ " storage directory(ies), previously logged.");
|
+ " storage directory(ies), previously logged.");
|
||||||
}
|
}
|
||||||
storage.initializeDistributedUpgrade();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doRollback() throws IOException {
|
private void doRollback() throws IOException {
|
||||||
|
@ -453,8 +438,6 @@ public class FSImage implements Closeable {
|
||||||
LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
|
LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
|
||||||
}
|
}
|
||||||
isUpgradeFinalized = true;
|
isUpgradeFinalized = true;
|
||||||
// check whether name-node can start in regular mode
|
|
||||||
storage.verifyDistributedUpgradeProgress(StartupOption.REGULAR);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doFinalize(StorageDirectory sd) throws IOException {
|
private void doFinalize(StorageDirectory sd) throws IOException {
|
||||||
|
|
|
@ -108,7 +108,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
|
@ -136,7 +135,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
@ -160,7 +158,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||||
|
@ -179,7 +176,6 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
@ -927,8 +923,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
NamespaceInfo unprotectedGetNamespaceInfo() {
|
NamespaceInfo unprotectedGetNamespaceInfo() {
|
||||||
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
|
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
|
||||||
getClusterId(), getBlockPoolId(),
|
getClusterId(), getBlockPoolId(),
|
||||||
dir.fsImage.getStorage().getCTime(),
|
dir.fsImage.getStorage().getCTime());
|
||||||
upgradeManager.getUpgradeVersion());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3372,13 +3367,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
|
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
|
||||||
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
|
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
|
||||||
xceiverCount, maxTransfer, failedVolumes);
|
xceiverCount, maxTransfer, failedVolumes);
|
||||||
if (cmds == null || cmds.length == 0) {
|
|
||||||
DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
|
|
||||||
if (cmd != null) {
|
|
||||||
cmds = new DatanodeCommand[] {cmd};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
|
return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
|
@ -3819,24 +3807,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
/**
|
/**
|
||||||
* Leave safe mode.
|
* Leave safe mode.
|
||||||
* <p>
|
* <p>
|
||||||
* Switch to manual safe mode if distributed upgrade is required.<br>
|
|
||||||
* Check for invalid, under- & over-replicated blocks in the end of startup.
|
* Check for invalid, under- & over-replicated blocks in the end of startup.
|
||||||
*/
|
*/
|
||||||
private synchronized void leave(boolean checkForUpgrades) {
|
private synchronized void leave() {
|
||||||
if(checkForUpgrades) {
|
|
||||||
// verify whether a distributed upgrade needs to be started
|
|
||||||
boolean needUpgrade = false;
|
|
||||||
try {
|
|
||||||
needUpgrade = upgradeManager.startUpgrade();
|
|
||||||
} catch(IOException e) {
|
|
||||||
FSNamesystem.LOG.error("IOException in startDistributedUpgradeIfNeeded", e);
|
|
||||||
}
|
|
||||||
if(needUpgrade) {
|
|
||||||
// switch to manual safe mode
|
|
||||||
safeMode = new SafeModeInfo(false);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// if not done yet, initialize replication queues.
|
// if not done yet, initialize replication queues.
|
||||||
// In the standby, do not populate repl queues
|
// In the standby, do not populate repl queues
|
||||||
if (!isPopulatingReplQueues() && !isInStandbyState()) {
|
if (!isPopulatingReplQueues() && !isInStandbyState()) {
|
||||||
|
@ -3930,7 +3903,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
// the threshold is reached
|
// the threshold is reached
|
||||||
if (!isOn() || // safe mode is off
|
if (!isOn() || // safe mode is off
|
||||||
extension <= 0 || threshold <= 0) { // don't need to wait
|
extension <= 0 || threshold <= 0) { // don't need to wait
|
||||||
this.leave(true); // leave safe mode
|
this.leave(); // leave safe mode
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (reached > 0) { // threshold has already been reached before
|
if (reached > 0) { // threshold has already been reached before
|
||||||
|
@ -4034,10 +4007,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
leaveMsg = "Safe mode will be turned off automatically";
|
leaveMsg = "Safe mode will be turned off automatically";
|
||||||
}
|
}
|
||||||
if(isManual()) {
|
if(isManual()) {
|
||||||
if(upgradeManager.getUpgradeState())
|
|
||||||
return leaveMsg + " upon completion of " +
|
|
||||||
"the distributed upgrade: upgrade progress = " +
|
|
||||||
upgradeManager.getUpgradeStatus() + "%";
|
|
||||||
leaveMsg = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off";
|
leaveMsg = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4172,13 +4141,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread. ");
|
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread. ");
|
||||||
} else {
|
} else {
|
||||||
// leave safe mode and stop the monitor
|
// leave safe mode and stop the monitor
|
||||||
try {
|
leaveSafeMode();
|
||||||
leaveSafeMode(true);
|
|
||||||
} catch(SafeModeException es) { // should never happen
|
|
||||||
String msg = "SafeModeMonitor may not run during distributed upgrade.";
|
|
||||||
assert false : msg;
|
|
||||||
throw new RuntimeException(msg, es);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
smmthread = null;
|
smmthread = null;
|
||||||
}
|
}
|
||||||
|
@ -4189,7 +4152,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
checkSuperuserPrivilege();
|
checkSuperuserPrivilege();
|
||||||
switch(action) {
|
switch(action) {
|
||||||
case SAFEMODE_LEAVE: // leave safe mode
|
case SAFEMODE_LEAVE: // leave safe mode
|
||||||
leaveSafeMode(false);
|
leaveSafeMode();
|
||||||
break;
|
break;
|
||||||
case SAFEMODE_ENTER: // enter safe mode
|
case SAFEMODE_ENTER: // enter safe mode
|
||||||
enterSafeMode(false);
|
enterSafeMode(false);
|
||||||
|
@ -4374,17 +4337,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
* Leave safe mode.
|
* Leave safe mode.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
|
void leaveSafeMode() {
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
if (!isInSafeMode()) {
|
if (!isInSafeMode()) {
|
||||||
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
|
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if(upgradeManager.getUpgradeState())
|
safeMode.leave();
|
||||||
throw new SafeModeException("Distributed upgrade is in progress",
|
|
||||||
safeMode);
|
|
||||||
safeMode.leave(checkForUpgrades);
|
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
|
@ -4459,18 +4419,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
return (blockManager.getBlockCollection(b) != null);
|
return (blockManager.getBlockCollection(b) != null);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Distributed upgrade manager
|
|
||||||
final UpgradeManagerNamenode upgradeManager = new UpgradeManagerNamenode(this);
|
|
||||||
|
|
||||||
UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
|
|
||||||
) throws IOException {
|
|
||||||
return upgradeManager.distributedUpgradeProgress(action);
|
|
||||||
}
|
|
||||||
|
|
||||||
UpgradeCommand processDistributedUpgradeCommand(UpgradeCommand comm) throws IOException {
|
|
||||||
return upgradeManager.processUpgradeCommand(comm);
|
|
||||||
}
|
|
||||||
|
|
||||||
PermissionStatus createFsOwnerPermissions(FsPermission permission) {
|
PermissionStatus createFsOwnerPermissions(FsPermission permission) {
|
||||||
return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
|
return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,8 +33,6 @@ import java.util.Properties;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
@ -46,7 +44,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
||||||
|
@ -67,8 +64,6 @@ import com.google.common.collect.Maps;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class NNStorage extends Storage implements Closeable,
|
public class NNStorage extends Storage implements Closeable,
|
||||||
StorageErrorReporter {
|
StorageErrorReporter {
|
||||||
private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
|
|
||||||
|
|
||||||
static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
|
static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
|
||||||
static final String LOCAL_URI_SCHEME = "file";
|
static final String LOCAL_URI_SCHEME = "file";
|
||||||
|
|
||||||
|
@ -114,7 +109,6 @@ public class NNStorage extends Storage implements Closeable,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private UpgradeManager upgradeManager = null;
|
|
||||||
protected String blockpoolID = ""; // id of the block pool
|
protected String blockpoolID = ""; // id of the block pool
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -553,11 +547,8 @@ public class NNStorage extends Storage implements Closeable,
|
||||||
|
|
||||||
public static NamespaceInfo newNamespaceInfo()
|
public static NamespaceInfo newNamespaceInfo()
|
||||||
throws UnknownHostException {
|
throws UnknownHostException {
|
||||||
return new NamespaceInfo(
|
return new NamespaceInfo(newNamespaceID(), newClusterID(),
|
||||||
newNamespaceID(),
|
newBlockPoolID(), 0L);
|
||||||
newClusterID(),
|
|
||||||
newBlockPoolID(),
|
|
||||||
0L, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void format() throws IOException {
|
public void format() throws IOException {
|
||||||
|
@ -602,13 +593,6 @@ public class NNStorage extends Storage implements Closeable,
|
||||||
String sbpid = props.getProperty("blockpoolID");
|
String sbpid = props.getProperty("blockpoolID");
|
||||||
setBlockPoolID(sd.getRoot(), sbpid);
|
setBlockPoolID(sd.getRoot(), sbpid);
|
||||||
}
|
}
|
||||||
|
|
||||||
String sDUS, sDUV;
|
|
||||||
sDUS = props.getProperty("distributedUpgradeState");
|
|
||||||
sDUV = props.getProperty("distributedUpgradeVersion");
|
|
||||||
setDistributedUpgradeState(
|
|
||||||
sDUS == null? false : Boolean.parseBoolean(sDUS),
|
|
||||||
sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
|
|
||||||
setDeprecatedPropertiesForUpgrade(props);
|
setDeprecatedPropertiesForUpgrade(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -655,13 +639,6 @@ public class NNStorage extends Storage implements Closeable,
|
||||||
if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
|
if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
|
||||||
props.setProperty("blockpoolID", blockpoolID);
|
props.setProperty("blockpoolID", blockpoolID);
|
||||||
}
|
}
|
||||||
boolean uState = getDistributedUpgradeState();
|
|
||||||
int uVersion = getDistributedUpgradeVersion();
|
|
||||||
if(uState && uVersion != getLayoutVersion()) {
|
|
||||||
props.setProperty("distributedUpgradeState", Boolean.toString(uState));
|
|
||||||
props.setProperty("distributedUpgradeVersion",
|
|
||||||
Integer.toString(uVersion));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static File getStorageFile(StorageDirectory sd, NameNodeFile type, long imageTxId) {
|
static File getStorageFile(StorageDirectory sd, NameNodeFile type, long imageTxId) {
|
||||||
|
@ -734,7 +711,7 @@ public class NNStorage extends Storage implements Closeable,
|
||||||
* Return the first readable image file for the given txid, or null
|
* Return the first readable image file for the given txid, or null
|
||||||
* if no such image can be found
|
* if no such image can be found
|
||||||
*/
|
*/
|
||||||
File findImageFile(long txid) throws IOException {
|
File findImageFile(long txid) {
|
||||||
return findFile(NameNodeDirType.IMAGE,
|
return findFile(NameNodeDirType.IMAGE,
|
||||||
getImageFileName(txid));
|
getImageFileName(txid));
|
||||||
}
|
}
|
||||||
|
@ -755,76 +732,6 @@ public class NNStorage extends Storage implements Closeable,
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the upgrade manager for use in a distributed upgrade.
|
|
||||||
* @param um The upgrade manager
|
|
||||||
*/
|
|
||||||
void setUpgradeManager(UpgradeManager um) {
|
|
||||||
upgradeManager = um;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return The current distribued upgrade state.
|
|
||||||
*/
|
|
||||||
boolean getDistributedUpgradeState() {
|
|
||||||
return upgradeManager == null ? false : upgradeManager.getUpgradeState();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return The current upgrade version.
|
|
||||||
*/
|
|
||||||
int getDistributedUpgradeVersion() {
|
|
||||||
return upgradeManager == null ? 0 : upgradeManager.getUpgradeVersion();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the upgrade state and version.
|
|
||||||
* @param uState the new state.
|
|
||||||
* @param uVersion the new version.
|
|
||||||
*/
|
|
||||||
private void setDistributedUpgradeState(boolean uState, int uVersion) {
|
|
||||||
if (upgradeManager != null) {
|
|
||||||
upgradeManager.setUpgradeState(uState, uVersion);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verify that the distributed upgrade state is valid.
|
|
||||||
* @param startOpt the option the namenode was started with.
|
|
||||||
*/
|
|
||||||
void verifyDistributedUpgradeProgress(StartupOption startOpt
|
|
||||||
) throws IOException {
|
|
||||||
if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT)
|
|
||||||
return;
|
|
||||||
|
|
||||||
assert upgradeManager != null : "FSNameSystem.upgradeManager is null.";
|
|
||||||
if(startOpt != StartupOption.UPGRADE) {
|
|
||||||
if(upgradeManager.getUpgradeState())
|
|
||||||
throw new IOException(
|
|
||||||
"\n Previous distributed upgrade was not completed. "
|
|
||||||
+ "\n Please restart NameNode with -upgrade option.");
|
|
||||||
if(upgradeManager.getDistributedUpgrades() != null)
|
|
||||||
throw new IOException("\n Distributed upgrade for NameNode version "
|
|
||||||
+ upgradeManager.getUpgradeVersion()
|
|
||||||
+ " to current LV " + HdfsConstants.LAYOUT_VERSION
|
|
||||||
+ " is required.\n Please restart NameNode"
|
|
||||||
+ " with -upgrade option.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize a distributed upgrade.
|
|
||||||
*/
|
|
||||||
void initializeDistributedUpgrade() throws IOException {
|
|
||||||
if(! upgradeManager.initializeUpgrade())
|
|
||||||
return;
|
|
||||||
// write new upgrade state into disk
|
|
||||||
writeAll();
|
|
||||||
LOG.info("\n Distributed upgrade for NameNode version "
|
|
||||||
+ upgradeManager.getUpgradeVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " is initialized.");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Disable the check for pre-upgradable layouts. Needed for BackupImage.
|
* Disable the check for pre-upgradable layouts. Needed for BackupImage.
|
||||||
* @param val Whether to disable the preupgradeable layout check.
|
* @param val Whether to disable the preupgradeable layout check.
|
||||||
|
|
|
@ -852,8 +852,7 @@ public class NameNode {
|
||||||
existingStorage.getNamespaceID(),
|
existingStorage.getNamespaceID(),
|
||||||
existingStorage.getClusterID(),
|
existingStorage.getClusterID(),
|
||||||
existingStorage.getBlockPoolID(),
|
existingStorage.getBlockPoolID(),
|
||||||
existingStorage.getCTime(),
|
existingStorage.getCTime()));
|
||||||
existingStorage.getDistributedUpgradeVersion()));
|
|
||||||
|
|
||||||
// Need to make sure the edit log segments are in good shape to initialize
|
// Need to make sure the edit log segments are in good shape to initialize
|
||||||
// the shared edits dir.
|
// the shared edits dir.
|
||||||
|
|
|
@ -743,8 +743,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
|
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
namesystem.checkOperation(OperationCategory.READ);
|
throw new UnsupportedActionException(
|
||||||
return namesystem.distributedUpgradeProgress(action);
|
"Deprecated method. No longer supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
|
@ -918,8 +918,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // DatanodeProtocol
|
@Override // DatanodeProtocol
|
||||||
public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
|
public UpgradeCommand processUpgradeCommand(UpgradeCommand comm)
|
||||||
return namesystem.processDistributedUpgradeCommand(comm);
|
throws IOException {
|
||||||
|
throw new UnsupportedActionException(
|
||||||
|
"Deprecated method, no longer supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -120,19 +120,6 @@ class NamenodeJspHelper {
|
||||||
return str;
|
return str;
|
||||||
}
|
}
|
||||||
|
|
||||||
static String getUpgradeStatusText(FSNamesystem fsn) {
|
|
||||||
String statusText = "";
|
|
||||||
try {
|
|
||||||
UpgradeStatusReport status = fsn
|
|
||||||
.distributedUpgradeProgress(UpgradeAction.GET_STATUS);
|
|
||||||
statusText = (status == null ? "There are no upgrades in progress."
|
|
||||||
: status.getStatusText(false));
|
|
||||||
} catch (IOException e) {
|
|
||||||
statusText = "Upgrade status unknown.";
|
|
||||||
}
|
|
||||||
return statusText;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Return a table containing version information. */
|
/** Return a table containing version information. */
|
||||||
static String getVersionTable(FSNamesystem fsn) {
|
static String getVersionTable(FSNamesystem fsn) {
|
||||||
return "<div class='dfstable'><table>"
|
return "<div class='dfstable'><table>"
|
||||||
|
@ -141,8 +128,6 @@ class NamenodeJspHelper {
|
||||||
+ VersionInfo.getVersion() + ", " + VersionInfo.getRevision()
|
+ VersionInfo.getVersion() + ", " + VersionInfo.getRevision()
|
||||||
+ "</td></tr>\n" + "\n <tr><td class='col1'>Compiled:</td><td>" + VersionInfo.getDate()
|
+ "</td></tr>\n" + "\n <tr><td class='col1'>Compiled:</td><td>" + VersionInfo.getDate()
|
||||||
+ " by " + VersionInfo.getUser() + " from " + VersionInfo.getBranch()
|
+ " by " + VersionInfo.getUser() + " from " + VersionInfo.getBranch()
|
||||||
+ "</td></tr>\n <tr><td class='col1'>Upgrades:</td><td>"
|
|
||||||
+ getUpgradeStatusText(fsn)
|
|
||||||
+ "</td></tr>\n <tr><td class='col1'>Cluster ID:</td><td>" + fsn.getClusterId()
|
+ "</td></tr>\n <tr><td class='col1'>Cluster ID:</td><td>" + fsn.getClusterId()
|
||||||
+ "</td></tr>\n <tr><td class='col1'>Block Pool ID:</td><td>" + fsn.getBlockPoolId()
|
+ "</td></tr>\n <tr><td class='col1'>Block Pool ID:</td><td>" + fsn.getBlockPoolId()
|
||||||
+ "</td></tr>\n</table></div>";
|
+ "</td></tr>\n</table></div>";
|
||||||
|
|
|
@ -1,147 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Upgrade manager for name-nodes.
|
|
||||||
*
|
|
||||||
* Distributed upgrades for a name-node starts when the safe mode conditions
|
|
||||||
* are met and the name-node is about to exit it.
|
|
||||||
* At this point the name-node enters manual safe mode which will remain
|
|
||||||
* on until the upgrade is completed.
|
|
||||||
* After that the name-nodes processes upgrade commands from data-nodes
|
|
||||||
* and updates its status.
|
|
||||||
*/
|
|
||||||
class UpgradeManagerNamenode extends UpgradeManager {
|
|
||||||
@Override
|
|
||||||
public HdfsServerConstants.NodeType getType() {
|
|
||||||
return HdfsServerConstants.NodeType.NAME_NODE;
|
|
||||||
}
|
|
||||||
|
|
||||||
private final FSNamesystem namesystem;
|
|
||||||
|
|
||||||
UpgradeManagerNamenode(FSNamesystem namesystem) {
|
|
||||||
this.namesystem = namesystem;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start distributed upgrade.
|
|
||||||
* Instantiates distributed upgrade objects.
|
|
||||||
*
|
|
||||||
* @return true if distributed upgrade is required or false otherwise
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public synchronized boolean startUpgrade() throws IOException {
|
|
||||||
if(!upgradeState) {
|
|
||||||
initializeUpgrade();
|
|
||||||
if(!upgradeState) return false;
|
|
||||||
// write new upgrade state into disk
|
|
||||||
namesystem.getFSImage().getStorage().writeAll();
|
|
||||||
}
|
|
||||||
assert currentUpgrades != null : "currentUpgrades is null";
|
|
||||||
this.broadcastCommand = currentUpgrades.first().startUpgrade();
|
|
||||||
NameNode.LOG.info("\n Distributed upgrade for NameNode version "
|
|
||||||
+ getUpgradeVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " is started.");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized UpgradeCommand processUpgradeCommand(UpgradeCommand command
|
|
||||||
) throws IOException {
|
|
||||||
if(NameNode.LOG.isDebugEnabled()) {
|
|
||||||
NameNode.LOG.debug("\n Distributed upgrade for NameNode version "
|
|
||||||
+ getUpgradeVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " is processing upgrade command: "
|
|
||||||
+ command.getAction() + " status = " + getUpgradeStatus() + "%");
|
|
||||||
}
|
|
||||||
if(currentUpgrades == null) {
|
|
||||||
NameNode.LOG.info("Ignoring upgrade command: "
|
|
||||||
+ command.getAction() + " version " + command.getVersion()
|
|
||||||
+ ". No distributed upgrades are currently running on the NameNode");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();
|
|
||||||
if(command.getVersion() != curUO.getVersion())
|
|
||||||
throw new IncorrectVersionException(command.getVersion(),
|
|
||||||
"UpgradeCommand", curUO.getVersion());
|
|
||||||
UpgradeCommand reply = curUO.processUpgradeCommand(command);
|
|
||||||
if(curUO.getUpgradeStatus() < 100) {
|
|
||||||
return reply;
|
|
||||||
}
|
|
||||||
// current upgrade is done
|
|
||||||
curUO.completeUpgrade();
|
|
||||||
NameNode.LOG.info("\n Distributed upgrade for NameNode version "
|
|
||||||
+ curUO.getVersion() + " to current LV "
|
|
||||||
+ HdfsConstants.LAYOUT_VERSION + " is complete.");
|
|
||||||
// proceede with the next one
|
|
||||||
currentUpgrades.remove(curUO);
|
|
||||||
if(currentUpgrades.isEmpty()) { // all upgrades are done
|
|
||||||
completeUpgrade();
|
|
||||||
} else { // start next upgrade
|
|
||||||
curUO = (UpgradeObjectNamenode)currentUpgrades.first();
|
|
||||||
this.broadcastCommand = curUO.startUpgrade();
|
|
||||||
}
|
|
||||||
return reply;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void completeUpgrade() throws IOException {
|
|
||||||
// set and write new upgrade state into disk
|
|
||||||
setUpgradeState(false, HdfsConstants.LAYOUT_VERSION);
|
|
||||||
namesystem.getFSImage().getStorage().writeAll();
|
|
||||||
currentUpgrades = null;
|
|
||||||
broadcastCommand = null;
|
|
||||||
namesystem.leaveSafeMode(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized UpgradeStatusReport distributedUpgradeProgress
|
|
||||||
(UpgradeAction action) throws IOException {
|
|
||||||
boolean isFinalized = false;
|
|
||||||
if(currentUpgrades == null) { // no upgrades are in progress
|
|
||||||
FSImage fsimage = namesystem.getFSImage();
|
|
||||||
isFinalized = fsimage.isUpgradeFinalized();
|
|
||||||
if(isFinalized) // upgrade is finalized
|
|
||||||
return null; // nothing to report
|
|
||||||
return new UpgradeStatusReport(fsimage.getStorage().getLayoutVersion(),
|
|
||||||
(short)101, isFinalized);
|
|
||||||
}
|
|
||||||
UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();
|
|
||||||
boolean details = false;
|
|
||||||
switch(action) {
|
|
||||||
case GET_STATUS:
|
|
||||||
break;
|
|
||||||
case DETAILED_STATUS:
|
|
||||||
details = true;
|
|
||||||
break;
|
|
||||||
case FORCE_PROCEED:
|
|
||||||
curUO.forceProceed();
|
|
||||||
}
|
|
||||||
return curUO.getUpgradeStatusReport(details);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,66 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeObject;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Base class for name-node upgrade objects.
|
|
||||||
* Data-node upgrades are run in separate threads.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public abstract class UpgradeObjectNamenode extends UpgradeObject {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process an upgrade command.
|
|
||||||
* RPC has only one very generic command for all upgrade related inter
|
|
||||||
* component communications.
|
|
||||||
* The actual command recognition and execution should be handled here.
|
|
||||||
* The reply is sent back also as an UpgradeCommand.
|
|
||||||
*
|
|
||||||
* @param command
|
|
||||||
* @return the reply command which is analyzed on the client side.
|
|
||||||
*/
|
|
||||||
public abstract UpgradeCommand processUpgradeCommand(UpgradeCommand command
|
|
||||||
) throws IOException;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HdfsServerConstants.NodeType getType() {
|
|
||||||
return HdfsServerConstants.NodeType.NAME_NODE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public UpgradeCommand startUpgrade() throws IOException {
|
|
||||||
// broadcast that data-nodes must start the upgrade
|
|
||||||
return new UpgradeCommand(UpgradeCommand.UC_ACTION_START_UPGRADE,
|
|
||||||
getVersion(), (short)0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void forceProceed() throws IOException {
|
|
||||||
// do nothing by default
|
|
||||||
NameNode.LOG.info("forceProceed() is not defined for the upgrade. "
|
|
||||||
+ getDescription());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.util.VersionInfo;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class NamespaceInfo extends StorageInfo {
|
public class NamespaceInfo extends StorageInfo {
|
||||||
String buildVersion;
|
String buildVersion;
|
||||||
int distributedUpgradeVersion;
|
|
||||||
String blockPoolID = ""; // id of the block pool
|
String blockPoolID = ""; // id of the block pool
|
||||||
String softwareVersion;
|
String softwareVersion;
|
||||||
|
|
||||||
|
@ -47,17 +46,16 @@ public class NamespaceInfo extends StorageInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
||||||
long cT, int duVersion, String buildVersion, String softwareVersion) {
|
long cT, String buildVersion, String softwareVersion) {
|
||||||
super(HdfsConstants.LAYOUT_VERSION, nsID, clusterID, cT);
|
super(HdfsConstants.LAYOUT_VERSION, nsID, clusterID, cT);
|
||||||
blockPoolID = bpID;
|
blockPoolID = bpID;
|
||||||
this.buildVersion = buildVersion;
|
this.buildVersion = buildVersion;
|
||||||
this.distributedUpgradeVersion = duVersion;
|
|
||||||
this.softwareVersion = softwareVersion;
|
this.softwareVersion = softwareVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
||||||
long cT, int duVersion) {
|
long cT) {
|
||||||
this(nsID, clusterID, bpID, cT, duVersion, Storage.getBuildVersion(),
|
this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(),
|
||||||
VersionInfo.getVersion());
|
VersionInfo.getVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,10 +63,6 @@ public class NamespaceInfo extends StorageInfo {
|
||||||
return buildVersion;
|
return buildVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getDistributedUpgradeVersion() {
|
|
||||||
return distributedUpgradeVersion;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getBlockPoolID() {
|
public String getBlockPoolID() {
|
||||||
return blockPoolID;
|
return blockPoolID;
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,8 +47,6 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
@ -303,15 +301,9 @@ public class DFSAdmin extends FsShell {
|
||||||
long remaining = ds.getRemaining();
|
long remaining = ds.getRemaining();
|
||||||
long presentCapacity = used + remaining;
|
long presentCapacity = used + remaining;
|
||||||
boolean mode = dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET);
|
boolean mode = dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET);
|
||||||
UpgradeStatusReport status =
|
|
||||||
dfs.distributedUpgradeProgress(UpgradeAction.GET_STATUS);
|
|
||||||
|
|
||||||
if (mode) {
|
if (mode) {
|
||||||
System.out.println("Safe mode is ON");
|
System.out.println("Safe mode is ON");
|
||||||
}
|
}
|
||||||
if (status != null) {
|
|
||||||
System.out.println(status.getStatusText(false));
|
|
||||||
}
|
|
||||||
System.out.println("Configured Capacity: " + capacity
|
System.out.println("Configured Capacity: " + capacity
|
||||||
+ " (" + StringUtils.byteDesc(capacity) + ")");
|
+ " (" + StringUtils.byteDesc(capacity) + ")");
|
||||||
System.out.println("Present Capacity: " + presentCapacity
|
System.out.println("Present Capacity: " + presentCapacity
|
||||||
|
@ -578,10 +570,6 @@ public class DFSAdmin extends FsShell {
|
||||||
"\t\tfollowed by Namenode doing the same.\n" +
|
"\t\tfollowed by Namenode doing the same.\n" +
|
||||||
"\t\tThis completes the upgrade process.\n";
|
"\t\tThis completes the upgrade process.\n";
|
||||||
|
|
||||||
String upgradeProgress = "-upgradeProgress <status|details|force>: \n" +
|
|
||||||
"\t\trequest current distributed upgrade status, \n" +
|
|
||||||
"\t\ta detailed status or force the upgrade to proceed.\n";
|
|
||||||
|
|
||||||
String metaSave = "-metasave <filename>: \tSave Namenode's primary data structures\n" +
|
String metaSave = "-metasave <filename>: \tSave Namenode's primary data structures\n" +
|
||||||
"\t\tto <filename> in the directory specified by hadoop.log.dir property.\n" +
|
"\t\tto <filename> in the directory specified by hadoop.log.dir property.\n" +
|
||||||
"\t\t<filename> will contain one line for each of the following\n" +
|
"\t\t<filename> will contain one line for each of the following\n" +
|
||||||
|
@ -643,8 +631,6 @@ public class DFSAdmin extends FsShell {
|
||||||
System.out.println(refreshNodes);
|
System.out.println(refreshNodes);
|
||||||
} else if ("finalizeUpgrade".equals(cmd)) {
|
} else if ("finalizeUpgrade".equals(cmd)) {
|
||||||
System.out.println(finalizeUpgrade);
|
System.out.println(finalizeUpgrade);
|
||||||
} else if ("upgradeProgress".equals(cmd)) {
|
|
||||||
System.out.println(upgradeProgress);
|
|
||||||
} else if ("metasave".equals(cmd)) {
|
} else if ("metasave".equals(cmd)) {
|
||||||
System.out.println(metaSave);
|
System.out.println(metaSave);
|
||||||
} else if (SetQuotaCommand.matches("-"+cmd)) {
|
} else if (SetQuotaCommand.matches("-"+cmd)) {
|
||||||
|
@ -681,7 +667,6 @@ public class DFSAdmin extends FsShell {
|
||||||
System.out.println(restoreFailedStorage);
|
System.out.println(restoreFailedStorage);
|
||||||
System.out.println(refreshNodes);
|
System.out.println(refreshNodes);
|
||||||
System.out.println(finalizeUpgrade);
|
System.out.println(finalizeUpgrade);
|
||||||
System.out.println(upgradeProgress);
|
|
||||||
System.out.println(metaSave);
|
System.out.println(metaSave);
|
||||||
System.out.println(SetQuotaCommand.DESCRIPTION);
|
System.out.println(SetQuotaCommand.DESCRIPTION);
|
||||||
System.out.println(ClearQuotaCommand.DESCRIPTION);
|
System.out.println(ClearQuotaCommand.DESCRIPTION);
|
||||||
|
@ -714,41 +699,6 @@ public class DFSAdmin extends FsShell {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Command to request current distributed upgrade status,
|
|
||||||
* a detailed status, or to force the upgrade to proceed.
|
|
||||||
*
|
|
||||||
* Usage: java DFSAdmin -upgradeProgress [status | details | force]
|
|
||||||
* @exception IOException
|
|
||||||
*/
|
|
||||||
public int upgradeProgress(String[] argv, int idx) throws IOException {
|
|
||||||
|
|
||||||
if (idx != argv.length - 1) {
|
|
||||||
printUsage("-upgradeProgress");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
UpgradeAction action;
|
|
||||||
if ("status".equalsIgnoreCase(argv[idx])) {
|
|
||||||
action = UpgradeAction.GET_STATUS;
|
|
||||||
} else if ("details".equalsIgnoreCase(argv[idx])) {
|
|
||||||
action = UpgradeAction.DETAILED_STATUS;
|
|
||||||
} else if ("force".equalsIgnoreCase(argv[idx])) {
|
|
||||||
action = UpgradeAction.FORCE_PROCEED;
|
|
||||||
} else {
|
|
||||||
printUsage("-upgradeProgress");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
DistributedFileSystem dfs = getDFS();
|
|
||||||
UpgradeStatusReport status = dfs.distributedUpgradeProgress(action);
|
|
||||||
String statusText = (status == null ?
|
|
||||||
"There are no upgrades in progress." :
|
|
||||||
status.getStatusText(action == UpgradeAction.DETAILED_STATUS));
|
|
||||||
System.out.println(statusText);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dumps DFS data structures into specified file.
|
* Dumps DFS data structures into specified file.
|
||||||
* Usage: java DFSAdmin -metasave filename
|
* Usage: java DFSAdmin -metasave filename
|
||||||
|
@ -918,9 +868,6 @@ public class DFSAdmin extends FsShell {
|
||||||
} else if ("-finalizeUpgrade".equals(cmd)) {
|
} else if ("-finalizeUpgrade".equals(cmd)) {
|
||||||
System.err.println("Usage: java DFSAdmin"
|
System.err.println("Usage: java DFSAdmin"
|
||||||
+ " [-finalizeUpgrade]");
|
+ " [-finalizeUpgrade]");
|
||||||
} else if ("-upgradeProgress".equals(cmd)) {
|
|
||||||
System.err.println("Usage: java DFSAdmin"
|
|
||||||
+ " [-upgradeProgress status | details | force]");
|
|
||||||
} else if ("-metasave".equals(cmd)) {
|
} else if ("-metasave".equals(cmd)) {
|
||||||
System.err.println("Usage: java DFSAdmin"
|
System.err.println("Usage: java DFSAdmin"
|
||||||
+ " [-metasave filename]");
|
+ " [-metasave filename]");
|
||||||
|
@ -969,7 +916,6 @@ public class DFSAdmin extends FsShell {
|
||||||
System.err.println(" [-restoreFailedStorage true|false|check]");
|
System.err.println(" [-restoreFailedStorage true|false|check]");
|
||||||
System.err.println(" [-refreshNodes]");
|
System.err.println(" [-refreshNodes]");
|
||||||
System.err.println(" [-finalizeUpgrade]");
|
System.err.println(" [-finalizeUpgrade]");
|
||||||
System.err.println(" [-upgradeProgress status | details | force]");
|
|
||||||
System.err.println(" [-metasave filename]");
|
System.err.println(" [-metasave filename]");
|
||||||
System.err.println(" [-refreshServiceAcl]");
|
System.err.println(" [-refreshServiceAcl]");
|
||||||
System.err.println(" [-refreshUserToGroupsMappings]");
|
System.err.println(" [-refreshUserToGroupsMappings]");
|
||||||
|
@ -1039,11 +985,6 @@ public class DFSAdmin extends FsShell {
|
||||||
printUsage(cmd);
|
printUsage(cmd);
|
||||||
return exitCode;
|
return exitCode;
|
||||||
}
|
}
|
||||||
} else if ("-upgradeProgress".equals(cmd)) {
|
|
||||||
if (argv.length != 2) {
|
|
||||||
printUsage(cmd);
|
|
||||||
return exitCode;
|
|
||||||
}
|
|
||||||
} else if ("-metasave".equals(cmd)) {
|
} else if ("-metasave".equals(cmd)) {
|
||||||
if (argv.length != 2) {
|
if (argv.length != 2) {
|
||||||
printUsage(cmd);
|
printUsage(cmd);
|
||||||
|
@ -1113,8 +1054,6 @@ public class DFSAdmin extends FsShell {
|
||||||
exitCode = refreshNodes();
|
exitCode = refreshNodes();
|
||||||
} else if ("-finalizeUpgrade".equals(cmd)) {
|
} else if ("-finalizeUpgrade".equals(cmd)) {
|
||||||
exitCode = finalizeUpgrade();
|
exitCode = finalizeUpgrade();
|
||||||
} else if ("-upgradeProgress".equals(cmd)) {
|
|
||||||
exitCode = upgradeProgress(argv, i);
|
|
||||||
} else if ("-metasave".equals(cmd)) {
|
} else if ("-metasave".equals(cmd)) {
|
||||||
exitCode = metaSave(argv, i);
|
exitCode = metaSave(argv, i);
|
||||||
} else if (ClearQuotaCommand.matches(cmd)) {
|
} else if (ClearQuotaCommand.matches(cmd)) {
|
||||||
|
|
|
@ -325,7 +325,7 @@ message RemoteEditLogManifestProto {
|
||||||
*/
|
*/
|
||||||
message NamespaceInfoProto {
|
message NamespaceInfoProto {
|
||||||
required string buildVersion = 1; // Software revision version (e.g. an svn or git revision)
|
required string buildVersion = 1; // Software revision version (e.g. an svn or git revision)
|
||||||
required uint32 distUpgradeVersion = 2; // Distributed upgrade version
|
required uint32 unused = 2; // Retained for backward compatibility
|
||||||
required string blockPoolID = 3; // block pool used by the namespace
|
required string blockPoolID = 3; // block pool used by the namespace
|
||||||
required StorageInfoProto storageInfo = 4;// Node information
|
required StorageInfoProto storageInfo = 4;// Node information
|
||||||
required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
|
required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
|
||||||
|
|
|
@ -75,9 +75,12 @@ public class TestHDFSCLI extends CLITestHelperDFS {
|
||||||
@After
|
@After
|
||||||
@Override
|
@Override
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
if (null != fs)
|
if (fs != null) {
|
||||||
fs.close();
|
fs.close();
|
||||||
dfsCluster.shutdown();
|
}
|
||||||
|
if (dfsCluster != null) {
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
}
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -381,14 +381,12 @@ public class TestPBHelper {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertNamespaceInfo() {
|
public void testConvertNamespaceInfo() {
|
||||||
NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300, 53);
|
NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300);
|
||||||
NamespaceInfoProto proto = PBHelper.convert(info);
|
NamespaceInfoProto proto = PBHelper.convert(info);
|
||||||
NamespaceInfo info2 = PBHelper.convert(proto);
|
NamespaceInfo info2 = PBHelper.convert(proto);
|
||||||
compare(info, info2); //Compare the StorageInfo
|
compare(info, info2); //Compare the StorageInfo
|
||||||
assertEquals(info.getBlockPoolID(), info2.getBlockPoolID());
|
assertEquals(info.getBlockPoolID(), info2.getBlockPoolID());
|
||||||
assertEquals(info.getBuildVersion(), info2.getBuildVersion());
|
assertEquals(info.getBuildVersion(), info2.getBuildVersion());
|
||||||
assertEquals(info.getDistributedUpgradeVersion(),
|
|
||||||
info2.getDistributedUpgradeVersion());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void compare(StorageInfo expected, StorageInfo actual) {
|
private void compare(StorageInfo expected, StorageInfo actual) {
|
||||||
|
@ -440,7 +438,7 @@ public class TestPBHelper {
|
||||||
DatanodeRegistration reg2 = PBHelper.convert(proto);
|
DatanodeRegistration reg2 = PBHelper.convert(proto);
|
||||||
compare(reg.getStorageInfo(), reg2.getStorageInfo());
|
compare(reg.getStorageInfo(), reg2.getStorageInfo());
|
||||||
compare(reg.getExportedKeys(), reg2.getExportedKeys());
|
compare(reg.getExportedKeys(), reg2.getExportedKeys());
|
||||||
compare((DatanodeID)reg, (DatanodeID)reg2);
|
compare(reg, reg2);
|
||||||
assertEquals(reg.getSoftwareVersion(), reg2.getSoftwareVersion());
|
assertEquals(reg.getSoftwareVersion(), reg2.getSoftwareVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -155,7 +155,7 @@ public class TestDelegationToken {
|
||||||
@Test
|
@Test
|
||||||
public void testAddDelegationTokensDFSApi() throws Exception {
|
public void testAddDelegationTokensDFSApi() throws Exception {
|
||||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("JobTracker");
|
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("JobTracker");
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
Credentials creds = new Credentials();
|
Credentials creds = new Credentials();
|
||||||
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
|
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
|
||||||
Assert.assertEquals(1, tokens.length);
|
Assert.assertEquals(1, tokens.length);
|
||||||
|
@ -198,7 +198,7 @@ public class TestDelegationToken {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokenWithDoAs() throws Exception {
|
public void testDelegationTokenWithDoAs() throws Exception {
|
||||||
final DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
final Credentials creds = new Credentials();
|
final Credentials creds = new Credentials();
|
||||||
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
|
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
|
||||||
Assert.assertEquals(1, tokens.length);
|
Assert.assertEquals(1, tokens.length);
|
||||||
|
@ -212,8 +212,7 @@ public class TestDelegationToken {
|
||||||
longUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
longUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws IOException {
|
public Object run() throws IOException {
|
||||||
final DistributedFileSystem dfs = (DistributedFileSystem) cluster
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
.getFileSystem();
|
|
||||||
try {
|
try {
|
||||||
//try renew with long name
|
//try renew with long name
|
||||||
dfs.renewDelegationToken(token);
|
dfs.renewDelegationToken(token);
|
||||||
|
@ -226,8 +225,7 @@ public class TestDelegationToken {
|
||||||
shortUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
shortUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws IOException {
|
public Object run() throws IOException {
|
||||||
final DistributedFileSystem dfs = (DistributedFileSystem) cluster
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
.getFileSystem();
|
|
||||||
dfs.renewDelegationToken(token);
|
dfs.renewDelegationToken(token);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -235,8 +233,7 @@ public class TestDelegationToken {
|
||||||
longUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
longUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws IOException {
|
public Object run() throws IOException {
|
||||||
final DistributedFileSystem dfs = (DistributedFileSystem) cluster
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
.getFileSystem();
|
|
||||||
try {
|
try {
|
||||||
//try cancel with long name
|
//try cancel with long name
|
||||||
dfs.cancelDelegationToken(token);
|
dfs.cancelDelegationToken(token);
|
||||||
|
@ -273,7 +270,7 @@ public class TestDelegationToken {
|
||||||
NameNodeAdapter.getDtSecretManager(nn.getNamesystem());
|
NameNodeAdapter.getDtSecretManager(nn.getNamesystem());
|
||||||
assertFalse("Secret manager should not run in safe mode", sm.isRunning());
|
assertFalse("Secret manager should not run in safe mode", sm.isRunning());
|
||||||
|
|
||||||
NameNodeAdapter.leaveSafeMode(nn, false);
|
NameNodeAdapter.leaveSafeMode(nn);
|
||||||
assertTrue("Secret manager should start when safe mode is exited",
|
assertTrue("Secret manager should start when safe mode is exited",
|
||||||
sm.isRunning());
|
sm.isRunning());
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
|
@ -111,10 +110,8 @@ public class TestBPOfferService {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
DatanodeProtocolClientSideTranslatorPB mock =
|
DatanodeProtocolClientSideTranslatorPB mock =
|
||||||
Mockito.mock(DatanodeProtocolClientSideTranslatorPB.class);
|
Mockito.mock(DatanodeProtocolClientSideTranslatorPB.class);
|
||||||
Mockito.doReturn(
|
Mockito.doReturn(new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0))
|
||||||
new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID,
|
.when(mock).versionRequest();
|
||||||
0, HdfsConstants.LAYOUT_VERSION))
|
|
||||||
.when(mock).versionRequest();
|
|
||||||
|
|
||||||
Mockito.doReturn(DFSTestUtil.getLocalDatanodeRegistration())
|
Mockito.doReturn(DFSTestUtil.getLocalDatanodeRegistration())
|
||||||
.when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class));
|
.when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class));
|
||||||
|
@ -229,10 +226,9 @@ public class TestBPOfferService {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testNNsFromDifferentClusters() throws Exception {
|
public void testNNsFromDifferentClusters() throws Exception {
|
||||||
Mockito.doReturn(
|
Mockito
|
||||||
new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID,
|
.doReturn(new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID, 0))
|
||||||
0, HdfsConstants.LAYOUT_VERSION))
|
.when(mockNN1).versionRequest();
|
||||||
.when(mockNN1).versionRequest();
|
|
||||||
|
|
||||||
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
||||||
bpos.start();
|
bpos.start();
|
||||||
|
|
|
@ -147,7 +147,7 @@ public class TestBlockRecovery {
|
||||||
Mockito.any(DatanodeRegistration.class));
|
Mockito.any(DatanodeRegistration.class));
|
||||||
|
|
||||||
when(namenode.versionRequest()).thenReturn(new NamespaceInfo
|
when(namenode.versionRequest()).thenReturn(new NamespaceInfo
|
||||||
(1, CLUSTER_ID, POOL_ID, 1L, 1));
|
(1, CLUSTER_ID, POOL_ID, 1L));
|
||||||
|
|
||||||
when(namenode.sendHeartbeat(
|
when(namenode.sendHeartbeat(
|
||||||
Mockito.any(DatanodeRegistration.class),
|
Mockito.any(DatanodeRegistration.class),
|
||||||
|
|
|
@ -239,8 +239,7 @@ public class TestDirectoryScanner {
|
||||||
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
||||||
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
|
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
|
||||||
parallelism);
|
parallelism);
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
scanner = new DirectoryScanner(fds, CONF);
|
||||||
scanner = new DirectoryScanner(dn, fds, CONF);
|
|
||||||
scanner.setRetainDiffs(true);
|
scanner.setRetainDiffs(true);
|
||||||
|
|
||||||
// Add files with 100 blocks
|
// Add files with 100 blocks
|
||||||
|
|
|
@ -81,9 +81,8 @@ public class NameNodeAdapter {
|
||||||
namenode.getNamesystem().enterSafeMode(resourcesLow);
|
namenode.getNamesystem().enterSafeMode(resourcesLow);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void leaveSafeMode(NameNode namenode, boolean checkForUpgrades)
|
public static void leaveSafeMode(NameNode namenode) {
|
||||||
throws SafeModeException {
|
namenode.getNamesystem().leaveSafeMode();
|
||||||
namenode.getNamesystem().leaveSafeMode(checkForUpgrades);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void abortEditLogs(NameNode nn) {
|
public static void abortEditLogs(NameNode nn) {
|
||||||
|
|
|
@ -184,10 +184,7 @@ public class TestEditLogRace {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
fileSys = cluster.getFileSystem();
|
fileSys = cluster.getFileSystem();
|
||||||
final FSNamesystem namesystem = cluster.getNamesystem();
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
|
||||||
FSImage fsimage = namesystem.getFSImage();
|
FSImage fsimage = namesystem.getFSImage();
|
||||||
FSEditLog editLog = fsimage.getEditLog();
|
|
||||||
|
|
||||||
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
|
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
|
||||||
|
|
||||||
startTransactionWorkers(namesystem, caughtErr);
|
startTransactionWorkers(namesystem, caughtErr);
|
||||||
|
@ -306,7 +303,7 @@ public class TestEditLogRace {
|
||||||
assertEquals(fsimage.getStorage().getMostRecentCheckpointTxId(),
|
assertEquals(fsimage.getStorage().getMostRecentCheckpointTxId(),
|
||||||
editLog.getLastWrittenTxId() - 1);
|
editLog.getLastWrittenTxId() - 1);
|
||||||
|
|
||||||
namesystem.leaveSafeMode(false);
|
namesystem.leaveSafeMode();
|
||||||
LOG.info("Save " + i + ": complete");
|
LOG.info("Save " + i + ": complete");
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class TestBootstrapStandby {
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void shutdownCluster() throws IOException {
|
public void shutdownCluster() {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -125,7 +125,7 @@ public class TestBootstrapStandby {
|
||||||
// Make checkpoint
|
// Make checkpoint
|
||||||
NameNodeAdapter.enterSafeMode(nn0, false);
|
NameNodeAdapter.enterSafeMode(nn0, false);
|
||||||
NameNodeAdapter.saveNamespace(nn0);
|
NameNodeAdapter.saveNamespace(nn0);
|
||||||
NameNodeAdapter.leaveSafeMode(nn0, false);
|
NameNodeAdapter.leaveSafeMode(nn0);
|
||||||
long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
|
long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
|
||||||
.getFSImage().getMostRecentCheckpointTxId();
|
.getFSImage().getMostRecentCheckpointTxId();
|
||||||
assertEquals(6, expectedCheckpointTxId);
|
assertEquals(6, expectedCheckpointTxId);
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class TestHASafeMode {
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void shutdownCluster() throws IOException {
|
public void shutdownCluster() {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -408,7 +408,7 @@ public class TestHASafeMode {
|
||||||
4*BLOCK_SIZE, (short) 3, 1L);
|
4*BLOCK_SIZE, (short) 3, 1L);
|
||||||
NameNodeAdapter.enterSafeMode(nn0, false);
|
NameNodeAdapter.enterSafeMode(nn0, false);
|
||||||
NameNodeAdapter.saveNamespace(nn0);
|
NameNodeAdapter.saveNamespace(nn0);
|
||||||
NameNodeAdapter.leaveSafeMode(nn0, false);
|
NameNodeAdapter.leaveSafeMode(nn0);
|
||||||
|
|
||||||
// OP_ADD for 2 blocks
|
// OP_ADD for 2 blocks
|
||||||
DFSTestUtil.createFile(fs, new Path("/test2"),
|
DFSTestUtil.createFile(fs, new Path("/test2"),
|
||||||
|
|
|
@ -27,7 +27,6 @@ import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -317,8 +316,7 @@ public class TestHAStateTransitions {
|
||||||
* Test that delegation tokens continue to work after the failover.
|
* Test that delegation tokens continue to work after the failover.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokensAfterFailover() throws IOException,
|
public void testDelegationTokensAfterFailover() throws IOException {
|
||||||
URISyntaxException {
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(
|
conf.setBoolean(
|
||||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||||
|
@ -472,7 +470,7 @@ public class TestHAStateTransitions {
|
||||||
assertFalse(isDTRunning(nn));
|
assertFalse(isDTRunning(nn));
|
||||||
|
|
||||||
banner("Transition 1->2. Should not start secret manager");
|
banner("Transition 1->2. Should not start secret manager");
|
||||||
NameNodeAdapter.leaveSafeMode(nn, false);
|
NameNodeAdapter.leaveSafeMode(nn);
|
||||||
assertTrue(nn.isStandbyState());
|
assertTrue(nn.isStandbyState());
|
||||||
assertFalse(nn.isInSafeMode());
|
assertFalse(nn.isInSafeMode());
|
||||||
assertFalse(isDTRunning(nn));
|
assertFalse(isDTRunning(nn));
|
||||||
|
@ -497,7 +495,7 @@ public class TestHAStateTransitions {
|
||||||
|
|
||||||
banner("Transition 1->3->4. Should start secret manager.");
|
banner("Transition 1->3->4. Should start secret manager.");
|
||||||
nn.getRpcServer().transitionToActive(REQ_INFO);
|
nn.getRpcServer().transitionToActive(REQ_INFO);
|
||||||
NameNodeAdapter.leaveSafeMode(nn, false);
|
NameNodeAdapter.leaveSafeMode(nn);
|
||||||
assertFalse(nn.isStandbyState());
|
assertFalse(nn.isStandbyState());
|
||||||
assertFalse(nn.isInSafeMode());
|
assertFalse(nn.isInSafeMode());
|
||||||
assertTrue(isDTRunning(nn));
|
assertTrue(isDTRunning(nn));
|
||||||
|
@ -509,7 +507,7 @@ public class TestHAStateTransitions {
|
||||||
assertFalse(isDTRunning(nn));
|
assertFalse(isDTRunning(nn));
|
||||||
|
|
||||||
banner("Transition 3->4. Should start secret manager");
|
banner("Transition 3->4. Should start secret manager");
|
||||||
NameNodeAdapter.leaveSafeMode(nn, false);
|
NameNodeAdapter.leaveSafeMode(nn);
|
||||||
assertFalse(nn.isStandbyState());
|
assertFalse(nn.isStandbyState());
|
||||||
assertFalse(nn.isInSafeMode());
|
assertFalse(nn.isInSafeMode());
|
||||||
assertTrue(isDTRunning(nn));
|
assertTrue(isDTRunning(nn));
|
||||||
|
|
|
@ -15269,29 +15269,6 @@
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
|
||||||
<test> <!--Tested -->
|
|
||||||
<description>help: help for dfsadmin upgradeProgress</description>
|
|
||||||
<test-commands>
|
|
||||||
<dfs-admin-command>-fs NAMENODE -help upgradeProgress</dfs-admin-command>
|
|
||||||
</test-commands>
|
|
||||||
<cleanup-commands>
|
|
||||||
</cleanup-commands>
|
|
||||||
<comparators>
|
|
||||||
<comparator>
|
|
||||||
<type>RegexpComparator</type>
|
|
||||||
<expected-output>^-upgradeProgress <status\|details\|force>:( )*</expected-output>
|
|
||||||
</comparator>
|
|
||||||
<comparator>
|
|
||||||
<type>RegexpComparator</type>
|
|
||||||
<expected-output>^( |\t)*request current distributed upgrade status,( )*</expected-output>
|
|
||||||
</comparator>
|
|
||||||
<comparator>
|
|
||||||
<type>RegexpComparator</type>
|
|
||||||
<expected-output>^( |\t)*a detailed status or force the upgrade to proceed.( )*</expected-output>
|
|
||||||
</comparator>
|
|
||||||
</comparators>
|
|
||||||
</test>
|
|
||||||
|
|
||||||
<test> <!--Tested -->
|
<test> <!--Tested -->
|
||||||
<description>help: help for dfsadmin metasave</description>
|
<description>help: help for dfsadmin metasave</description>
|
||||||
<test-commands>
|
<test-commands>
|
||||||
|
|
Loading…
Reference in New Issue