HDFS-4988. Datanode must support all the volumes as individual storages.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1526969 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8062d8c239
commit
46099ce7f1
|
@ -33,3 +33,6 @@ IMPROVEMENTS:
|
|||
|
||||
HDFS-5222. Move block schedule information from DatanodeDescriptor to
|
||||
DatanodeStorageInfo. (szetszwo)
|
||||
|
||||
HDFS-4988. Datanode must support all the volumes as individual storages.
|
||||
(Arpit Agarwal)
|
||||
|
|
|
@ -106,7 +106,9 @@ public class LayoutVersion {
|
|||
SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
|
||||
"block IDs in the edits log and image files"),
|
||||
EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to "
|
||||
+ "enable rebuilding retry cache in case of HA failover");
|
||||
+ "enable rebuilding retry cache in case of HA failover"),
|
||||
DATANODE_ID(-48, "UUID per Datanode and distinct StorageID per storage "
|
||||
+ "directory.");
|
||||
|
||||
final int lv;
|
||||
final int ancestorLV;
|
||||
|
|
|
@ -231,11 +231,14 @@ public class PBHelper {
|
|||
}
|
||||
|
||||
public static DatanodeIDProto convert(DatanodeID dn) {
|
||||
// For wire compatibility with older versions we transmit the StorageID
|
||||
// which is the same as the DatanodeUuid. Since StorageID is a required
|
||||
// field we pass the empty string if the DatanodeUuid is not yet known.
|
||||
return DatanodeIDProto.newBuilder()
|
||||
.setIpAddr(dn.getIpAddr())
|
||||
.setHostName(dn.getHostName())
|
||||
.setDatanodeUuid(dn.getDatanodeUuid())
|
||||
.setXferPort(dn.getXferPort())
|
||||
.setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "")
|
||||
.setInfoPort(dn.getInfoPort())
|
||||
.setIpcPort(dn.getIpcPort()).build();
|
||||
}
|
||||
|
|
|
@ -420,6 +420,10 @@ public class DatanodeManager {
|
|||
|
||||
/** Get a datanode descriptor given corresponding DatanodeUUID */
|
||||
DatanodeDescriptor getDatanode(final String datanodeUuid) {
|
||||
if (datanodeUuid == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return datanodeMap.get(datanodeUuid);
|
||||
}
|
||||
|
||||
|
@ -776,7 +780,7 @@ public class DatanodeManager {
|
|||
NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
|
||||
+ nodeReg + " storage " + nodeReg.getDatanodeUuid());
|
||||
|
||||
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getDatanodeUuid());
|
||||
DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
|
||||
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
|
||||
nodeReg.getIpAddr(), nodeReg.getXferPort());
|
||||
|
||||
|
@ -843,13 +847,13 @@ public class DatanodeManager {
|
|||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// this is a new datanode serving a new data storage
|
||||
if ("".equals(nodeReg.getDatanodeUuid())) {
|
||||
// this data storage has never been registered
|
||||
// it is either empty or was created by pre-storageID version of DFS
|
||||
nodeReg.setDatanodeUuid(DatanodeStorage.newStorageID());
|
||||
}
|
||||
|
||||
// This is a new datanode.
|
||||
if (nodeReg.getDatanodeUuid() == null ||
|
||||
nodeReg.getDatanodeUuid().isEmpty()) {
|
||||
// this data node has never been registered
|
||||
nodeReg.generateNewDatanodeUuid();
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug(
|
||||
"BLOCK* NameSystem.registerDatanode: "
|
||||
|
|
|
@ -236,6 +236,9 @@ public abstract class Storage extends StorageInfo {
|
|||
final boolean useLock; // flag to enable storage lock
|
||||
final StorageDirType dirType; // storage dir type
|
||||
FileLock lock; // storage lock
|
||||
|
||||
//TODO HDFS-2832: Consider moving this out of StorageDirectory.
|
||||
String storageUuid = null; // Storage directory identifier.
|
||||
|
||||
public StorageDirectory(File dir) {
|
||||
// default dirType is null
|
||||
|
@ -246,6 +249,14 @@ public abstract class Storage extends StorageInfo {
|
|||
this(dir, dirType, true);
|
||||
}
|
||||
|
||||
public void setStorageUuid(String storageUuid) {
|
||||
this.storageUuid = storageUuid;
|
||||
}
|
||||
|
||||
public String getStorageUuid() {
|
||||
return storageUuid;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param dir directory corresponding to the storage
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -257,6 +258,7 @@ class BPServiceActor implements Runnable {
|
|||
|
||||
/**
|
||||
* Report received blocks and delete hints to the Namenode
|
||||
* TODO: Fix reportReceivedDeletedBlocks to send reports per-volume.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -388,23 +390,40 @@ class BPServiceActor implements Runnable {
|
|||
// a FINALIZED one.
|
||||
reportReceivedDeletedBlocks();
|
||||
|
||||
// Send one block report per known storage.
|
||||
|
||||
// Create block report
|
||||
long brCreateStartTime = now();
|
||||
BlockListAsLongs bReport = dn.getFSDataset().getBlockReport(
|
||||
bpos.getBlockPoolId());
|
||||
long totalBlockCount = 0;
|
||||
|
||||
Map<String, BlockListAsLongs> perVolumeBlockLists =
|
||||
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
|
||||
|
||||
// Send block report
|
||||
long brSendStartTime = now();
|
||||
StorageBlockReport[] report = { new StorageBlockReport(
|
||||
new DatanodeStorage(bpRegistration.getDatanodeUuid()),
|
||||
bReport.getBlockListAsLongs()) };
|
||||
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), report);
|
||||
StorageBlockReport[] reports =
|
||||
new StorageBlockReport[perVolumeBlockLists.size()];
|
||||
|
||||
int i = 0;
|
||||
for(Map.Entry<String, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
|
||||
String storageID = kvPair.getKey();
|
||||
BlockListAsLongs blockList = kvPair.getValue();
|
||||
totalBlockCount += blockList.getNumberOfBlocks();
|
||||
|
||||
// Dummy DatanodeStorage object just for sending the block report.
|
||||
DatanodeStorage dnStorage = new DatanodeStorage(storageID);
|
||||
reports[i++] =
|
||||
new StorageBlockReport(
|
||||
dnStorage, blockList.getBlockListAsLongs());
|
||||
}
|
||||
|
||||
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
|
||||
|
||||
// Log the block report processing stats from Datanode perspective
|
||||
long brSendCost = now() - brSendStartTime;
|
||||
long brCreateCost = brSendStartTime - brCreateStartTime;
|
||||
dn.getMetrics().addBlockReport(brSendCost);
|
||||
LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
|
||||
LOG.info("BlockReport of " + totalBlockCount
|
||||
+ " blocks took " + brCreateCost + " msec to generate and "
|
||||
+ brSendCost + " msecs for RPC and NN processing");
|
||||
|
||||
|
|
|
@ -517,7 +517,7 @@ public class DataNode extends Configured
|
|||
directoryScanner.start();
|
||||
} else {
|
||||
LOG.info("Periodic Directory Tree Verification scan is disabled because " +
|
||||
reason);
|
||||
reason);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -750,7 +750,7 @@ public class DataNode extends Configured
|
|||
}
|
||||
DatanodeID dnId = new DatanodeID(
|
||||
streamingAddr.getAddress().getHostAddress(), hostName,
|
||||
getDatanodeUuid(), getXferPort(), getInfoPort(), getIpcPort());
|
||||
storage.getDatanodeUuid(), getXferPort(), getInfoPort(), getIpcPort());
|
||||
return new DatanodeRegistration(dnId, storageInfo,
|
||||
new ExportedBlockKeys(), VersionInfo.getVersion());
|
||||
}
|
||||
|
@ -768,16 +768,16 @@ public class DataNode extends Configured
|
|||
id = bpRegistration;
|
||||
}
|
||||
|
||||
if (storage.getStorageID().equals("")) {
|
||||
// This is a fresh datanode, persist the NN-provided storage ID
|
||||
storage.setStorageID(bpRegistration.getDatanodeUuid());
|
||||
if (storage.getDatanodeUuid() == null) {
|
||||
// This is a fresh datanode, persist the NN-provided Datanode ID
|
||||
storage.setDatanodeUuid(bpRegistration.getDatanodeUuid());
|
||||
storage.writeAll();
|
||||
LOG.info("New storage id " + bpRegistration.getDatanodeUuid()
|
||||
+ " is assigned to data-node " + bpRegistration);
|
||||
} else if(!storage.getStorageID().equals(bpRegistration.getDatanodeUuid())) {
|
||||
throw new IOException("Inconsistent storage IDs. Name-node returned "
|
||||
LOG.info("Datanode ID " + bpRegistration.getDatanodeUuid()
|
||||
+ " is assigned to new storage " + bpRegistration);
|
||||
} else if(!storage.getDatanodeUuid().equals(bpRegistration.getDatanodeUuid())) {
|
||||
throw new IOException("Inconsistent Datanode IDs. Name-node returned "
|
||||
+ bpRegistration.getDatanodeUuid()
|
||||
+ ". Expecting " + storage.getStorageID());
|
||||
+ ". Expecting " + storage.getDatanodeUuid());
|
||||
}
|
||||
|
||||
registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
|
||||
|
@ -925,10 +925,6 @@ public class DataNode extends Configured
|
|||
return streamingAddr.getPort();
|
||||
}
|
||||
|
||||
String getDatanodeUuid() {
|
||||
return storage.getStorageID();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return name useful for logging
|
||||
*/
|
||||
|
@ -1014,11 +1010,6 @@ public class DataNode extends Configured
|
|||
return metrics;
|
||||
}
|
||||
|
||||
public static void setNewStorageID(DatanodeID dnId) {
|
||||
LOG.info("Datanode is " + dnId);
|
||||
dnId.setDatanodeUuid(DatanodeStorage.newStorageID());
|
||||
}
|
||||
|
||||
/** Ensure the authentication method is kerberos */
|
||||
private void checkKerberosAuthMethod(String msg) throws IOException {
|
||||
// User invoking the call must be same as the datanode user
|
||||
|
@ -1818,7 +1809,7 @@ public class DataNode extends Configured
|
|||
@Override
|
||||
public String toString() {
|
||||
return "DataNode{data=" + data + ", localName='" + getDisplayName()
|
||||
+ "', storageID='" + getDatanodeUuid() + "', xmitsInProgress="
|
||||
+ "', datanodeUuid='" + storage.getDatanodeUuid() + "', xmitsInProgress="
|
||||
+ xmitsInProgress.get() + "}";
|
||||
}
|
||||
|
||||
|
@ -1872,7 +1863,6 @@ public class DataNode extends Configured
|
|||
}
|
||||
|
||||
/**
|
||||
* This method is used for testing.
|
||||
* Examples are adding and deleting blocks directly.
|
||||
* The most common usage will be when the data node's storage is simulated.
|
||||
*
|
||||
|
@ -2425,6 +2415,10 @@ public class DataNode extends Configured
|
|||
return dnConf;
|
||||
}
|
||||
|
||||
public String getDatanodeUuid() {
|
||||
return id == null ? null : id.getDatanodeUuid();
|
||||
}
|
||||
|
||||
boolean shouldRun() {
|
||||
return shouldRun;
|
||||
}
|
||||
|
|
|
@ -24,13 +24,7 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.channels.FileLock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -72,8 +66,13 @@ public class DataStorage extends Storage {
|
|||
public final static String STORAGE_DIR_FINALIZED = "finalized";
|
||||
public final static String STORAGE_DIR_TMP = "tmp";
|
||||
|
||||
/** Unique storage ID. {@see DataNode#createNewStorageId(int)} for details */
|
||||
private String storageID;
|
||||
/**
|
||||
* Datanode UUID that this storage is currently attached to. This
|
||||
* is the same as the legacy StorageID for datanodes that were
|
||||
* upgraded from a pre-UUID version. For compatibility with prior
|
||||
* versions of Datanodes we cannot make this field a UUID.
|
||||
*/
|
||||
private String datanodeUuid = null;
|
||||
|
||||
// Flag to ensure we only initialize storage once
|
||||
private boolean initialized = false;
|
||||
|
@ -85,33 +84,29 @@ public class DataStorage extends Storage {
|
|||
|
||||
DataStorage() {
|
||||
super(NodeType.DATA_NODE);
|
||||
storageID = "";
|
||||
}
|
||||
|
||||
public StorageInfo getBPStorage(String bpid) {
|
||||
return bpStorageMap.get(bpid);
|
||||
}
|
||||
|
||||
public DataStorage(StorageInfo storageInfo, String strgID) {
|
||||
public DataStorage(StorageInfo storageInfo) {
|
||||
super(NodeType.DATA_NODE, storageInfo);
|
||||
this.storageID = strgID;
|
||||
}
|
||||
|
||||
/** @return storage ID. */
|
||||
public synchronized String getStorageID() {
|
||||
return storageID;
|
||||
public synchronized String getDatanodeUuid() {
|
||||
return datanodeUuid;
|
||||
}
|
||||
|
||||
synchronized void setStorageID(String newStorageID) {
|
||||
this.storageID = newStorageID;
|
||||
synchronized void setDatanodeUuid(String newDatanodeUuid) {
|
||||
this.datanodeUuid = newDatanodeUuid;
|
||||
}
|
||||
|
||||
/** Create an ID for this storage. */
|
||||
public synchronized void createStorageID() {
|
||||
if (storageID != null && !storageID.isEmpty()) {
|
||||
return;
|
||||
public synchronized void createStorageID(StorageDirectory sd) {
|
||||
if (sd.getStorageUuid() == null) {
|
||||
sd.setStorageUuid(DatanodeStorage.newStorageID());
|
||||
}
|
||||
storageID = DatanodeStorage.newStorageID();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -164,7 +159,7 @@ public class DataStorage extends Storage {
|
|||
case NOT_FORMATTED: // format
|
||||
LOG.info("Storage directory " + dataDir + " is not formatted");
|
||||
LOG.info("Formatting ...");
|
||||
format(sd, nsInfo);
|
||||
format(sd, nsInfo, datanode.getDatanodeUuid());
|
||||
break;
|
||||
default: // recovery part is common
|
||||
sd.doRecover(curState);
|
||||
|
@ -193,11 +188,9 @@ public class DataStorage extends Storage {
|
|||
doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
|
||||
assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
|
||||
"Data-node and name-node layout versions must be the same.";
|
||||
createStorageID(getStorageDir(idx));
|
||||
}
|
||||
|
||||
// make sure we have storage id set - if not - generate new one
|
||||
createStorageID();
|
||||
|
||||
// 3. Update all storages. Some of them might have just been formatted.
|
||||
this.writeAll();
|
||||
|
||||
|
@ -265,19 +258,28 @@ public class DataStorage extends Storage {
|
|||
}
|
||||
}
|
||||
|
||||
void format(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
|
||||
void format(StorageDirectory sd, NamespaceInfo nsInfo,
|
||||
String datanodeUuid) throws IOException {
|
||||
sd.clearDirectory(); // create directory
|
||||
this.layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
||||
this.clusterID = nsInfo.getClusterID();
|
||||
this.namespaceID = nsInfo.getNamespaceID();
|
||||
this.cTime = 0;
|
||||
// store storageID as it currently is
|
||||
this.datanodeUuid = datanodeUuid;
|
||||
|
||||
if (sd.getStorageUuid() == null) {
|
||||
// Assign a new Storage UUID.
|
||||
sd.setStorageUuid(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
writeProperties(sd);
|
||||
}
|
||||
|
||||
/*
|
||||
* Set ClusterID, StorageID, StorageType, CTime into
|
||||
* DataStorage VERSION file
|
||||
* DataStorage VERSION file.
|
||||
* Always called just before writing the properties to
|
||||
* the VERSION file.
|
||||
*/
|
||||
@Override
|
||||
protected void setPropertiesFromFields(Properties props,
|
||||
|
@ -287,7 +289,13 @@ public class DataStorage extends Storage {
|
|||
props.setProperty("clusterID", clusterID);
|
||||
props.setProperty("cTime", String.valueOf(cTime));
|
||||
props.setProperty("layoutVersion", String.valueOf(layoutVersion));
|
||||
props.setProperty("storageID", getStorageID());
|
||||
props.setProperty("storageID", sd.getStorageUuid());
|
||||
|
||||
if (LayoutVersion.supports(Feature.DATANODE_ID, layoutVersion) &&
|
||||
datanodeUuid != null) {
|
||||
props.setProperty("datanodeUuid", datanodeUuid);
|
||||
}
|
||||
|
||||
// Set NamespaceID in version before federation
|
||||
if (!LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
|
||||
props.setProperty("namespaceID", String.valueOf(namespaceID));
|
||||
|
@ -297,6 +305,8 @@ public class DataStorage extends Storage {
|
|||
/*
|
||||
* Read ClusterID, StorageID, StorageType, CTime from
|
||||
* DataStorage VERSION file and verify them.
|
||||
* Always called just after reading the properties from the VERSION
|
||||
* file.
|
||||
*/
|
||||
@Override
|
||||
protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
|
||||
|
@ -311,20 +321,36 @@ public class DataStorage extends Storage {
|
|||
setNamespaceID(props, sd);
|
||||
}
|
||||
|
||||
|
||||
// valid storage id, storage id may be empty
|
||||
String ssid = props.getProperty("storageID");
|
||||
if (ssid == null) {
|
||||
throw new InconsistentFSStateException(sd.getRoot(), "file "
|
||||
+ STORAGE_FILE_VERSION + " is invalid.");
|
||||
}
|
||||
String sid = getStorageID();
|
||||
if (!(sid.equals("") || ssid.equals("") || sid.equals(ssid))) {
|
||||
String sid = sd.getStorageUuid();
|
||||
if (!(sid == null || sid.equals("") ||
|
||||
ssid.equals("") || sid.equals(ssid))) {
|
||||
throw new InconsistentFSStateException(sd.getRoot(),
|
||||
"has incompatible storage Id.");
|
||||
}
|
||||
|
||||
if (sid.equals("")) { // update id only if it was empty
|
||||
setStorageID(ssid);
|
||||
|
||||
if (sid == null) { // update id only if it was null
|
||||
sd.setStorageUuid(ssid);
|
||||
}
|
||||
|
||||
// Update the datanode UUID if present.
|
||||
if (props.getProperty("datanodeUuid") != null) {
|
||||
String dnUuid = props.getProperty("datanodeUuid");
|
||||
|
||||
if (getDatanodeUuid() == null) {
|
||||
setDatanodeUuid(dnUuid);
|
||||
} else if (getDatanodeUuid().compareTo(dnUuid) != 0) {
|
||||
throw new InconsistentFSStateException(sd.getRoot(),
|
||||
"Root " + sd.getRoot() + ": DatanodeUuid=" + dnUuid +
|
||||
", does not match " + datanodeUuid + " from other" +
|
||||
" StorageDirectory.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -92,9 +92,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
/** @return a volume information map (name => info). */
|
||||
public Map<String, Object> getVolumeInfoMap();
|
||||
|
||||
/** @return a list of block pools. */
|
||||
public String[] getBlockPoolList();
|
||||
|
||||
/** @return a list of finalized blocks for the given block pool. */
|
||||
public List<Block> getFinalizedBlocks(String bpid);
|
||||
|
||||
|
@ -262,6 +259,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
public void unfinalizeBlock(ExtendedBlock b) throws IOException;
|
||||
|
||||
/**
|
||||
* TODO HDFS-2832: Deprecate this when we fix tests.
|
||||
* Returns the block report - the full list of blocks stored under a
|
||||
* block pool
|
||||
* @param bpid Block Pool Id
|
||||
|
@ -269,6 +267,13 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
*/
|
||||
public BlockListAsLongs getBlockReport(String bpid);
|
||||
|
||||
/**
|
||||
* Returns one block report per volume.
|
||||
* @param bpid Block Pool Id
|
||||
* @return - a map of StorageID to block report for the volume.
|
||||
*/
|
||||
public Map<String, BlockListAsLongs> getBlockReports(String bpid);
|
||||
|
||||
/** Does the dataset contain the block? */
|
||||
public boolean contains(ExtendedBlock block);
|
||||
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
|
@ -53,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
|||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
|
@ -170,10 +170,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
final DataNode datanode;
|
||||
final FsVolumeList volumes;
|
||||
final ReplicaMap volumeMap;
|
||||
final FsDatasetAsyncDiskService asyncDiskService;
|
||||
private final int validVolsRequired;
|
||||
|
||||
// TODO HDFS-2832: Consider removing duplicated block info from these
|
||||
// two maps. This might require some refactoring
|
||||
// rewrite of FsDatasetImpl.
|
||||
final ReplicaMap volumeMap;
|
||||
final Map<FsVolumeImpl, ReplicaMap> perVolumeReplicaMap;
|
||||
|
||||
|
||||
// Used for synchronizing access to usage stats
|
||||
private final Object statsLock = new Object();
|
||||
|
||||
|
@ -211,16 +217,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
|
||||
storage.getNumStorageDirs());
|
||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||
final File dir = storage.getStorageDir(idx).getCurrentDir();
|
||||
// TODO: getStorageTypeFromLocations() is only a temporary workaround and
|
||||
// should be replaced with getting storage type from DataStorage (missing
|
||||
// storage type now) directly.
|
||||
Storage.StorageDirectory sd = storage.getStorageDir(idx);
|
||||
final File dir = sd.getCurrentDir();
|
||||
final StorageType storageType = getStorageTypeFromLocations(dataLocations, dir);
|
||||
volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf,
|
||||
volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
|
||||
storageType));
|
||||
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
||||
}
|
||||
volumeMap = new ReplicaMap(this);
|
||||
perVolumeReplicaMap = new HashMap<FsVolumeImpl, ReplicaMap>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
|
||||
|
@ -229,14 +237,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
RoundRobinVolumeChoosingPolicy.class,
|
||||
VolumeChoosingPolicy.class), conf);
|
||||
volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
|
||||
volumes.getVolumeMap(volumeMap);
|
||||
volumes.initializeReplicaMaps(perVolumeReplicaMap, volumeMap, this);
|
||||
|
||||
File[] roots = new File[storage.getNumStorageDirs()];
|
||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
|
||||
}
|
||||
asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
|
||||
registerMBean(storage.getStorageID());
|
||||
registerMBean(datanode.getDatanodeUuid());
|
||||
}
|
||||
|
||||
private StorageType getStorageTypeFromLocations(
|
||||
|
@ -326,9 +334,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
File getBlockFile(String bpid, Block b) throws IOException {
|
||||
File f = validateBlockFile(bpid, b);
|
||||
if(f == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
|
||||
}
|
||||
throw new IOException("Block " + b + " is not valid.");
|
||||
}
|
||||
return f;
|
||||
|
@ -602,6 +607,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
// Replace finalized replica by a RBW replica in replicas map
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v).add(bpid, newReplicaInfo);
|
||||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
@ -731,6 +737,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v).add(b.getBlockPoolId(), newReplicaInfo);
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
|
@ -849,6 +856,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
rbw.setBytesAcked(visible);
|
||||
// overwrite the RBW in the volume map
|
||||
volumeMap.add(b.getBlockPoolId(), rbw);
|
||||
perVolumeReplicaMap.get(v).add(b.getBlockPoolId(), rbw);
|
||||
return rbw;
|
||||
}
|
||||
|
||||
|
@ -868,6 +876,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v).add(b.getBlockPoolId(), newReplicaInfo);
|
||||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
@ -936,6 +945,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
||||
}
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
perVolumeReplicaMap.get(newReplicaInfo.getVolume()).add(bpid, newReplicaInfo);
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
|
@ -949,6 +959,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
|
||||
// remove from volumeMap
|
||||
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
|
||||
perVolumeReplicaMap.get((FsVolumeImpl) replicaInfo.getVolume())
|
||||
.remove(b.getBlockPoolId(), b.getLocalBlock());
|
||||
|
||||
// delete the on-disk temp file
|
||||
if (delBlockFromDisk(replicaInfo.getBlockFile(),
|
||||
|
@ -983,12 +995,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a block report from the in-memory block map.
|
||||
*/
|
||||
@Override // FsDatasetSpi
|
||||
public BlockListAsLongs getBlockReport(String bpid) {
|
||||
int size = volumeMap.size(bpid);
|
||||
private BlockListAsLongs getBlockReportWithReplicaMap(
|
||||
String bpid, ReplicaMap rMap) {
|
||||
int size = rMap.size(bpid);
|
||||
ArrayList<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(size);
|
||||
ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
|
||||
if (size == 0) {
|
||||
|
@ -996,7 +1005,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
synchronized(this) {
|
||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||
for (ReplicaInfo b : rMap.replicas(bpid)) {
|
||||
switch(b.getState()) {
|
||||
case FINALIZED:
|
||||
finalized.add(b);
|
||||
|
@ -1019,6 +1028,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a block report from the in-memory block map.
|
||||
*/
|
||||
@Override // FsDatasetSpi
|
||||
public BlockListAsLongs getBlockReport(String bpid) {
|
||||
return getBlockReportWithReplicaMap(bpid, volumeMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlockListAsLongs> getBlockReports(String bpid) {
|
||||
Map<String, BlockListAsLongs> blockReportMap =
|
||||
new HashMap<String, BlockListAsLongs>();
|
||||
|
||||
for (FsVolumeImpl v : getVolumes()) {
|
||||
ReplicaMap rMap = perVolumeReplicaMap.get(v);
|
||||
BlockListAsLongs blockList = getBlockReportWithReplicaMap(bpid, rMap);
|
||||
blockReportMap.put(v.getStorageID(), blockList);
|
||||
}
|
||||
|
||||
return blockReportMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of finalized blocks from in-memory blockmap for a block pool.
|
||||
*/
|
||||
|
@ -1159,6 +1190,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
v.clearPath(bpid, parent);
|
||||
}
|
||||
volumeMap.remove(bpid, invalidBlks[i]);
|
||||
perVolumeReplicaMap.get(v).remove(bpid, invalidBlks[i]);
|
||||
}
|
||||
|
||||
// Delete the block asynchronously to make sure we can do it fast enough
|
||||
|
@ -1220,6 +1252,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
|
||||
+ " on failed volume " + fv.getCurrentDir().getAbsolutePath());
|
||||
ib.remove();
|
||||
perVolumeReplicaMap.get(fv).remove(bpid, b.getBlockId());
|
||||
removedBlocks++;
|
||||
}
|
||||
}
|
||||
|
@ -1248,22 +1281,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
/**
|
||||
* Register the FSDataset MBean using the name
|
||||
* "hadoop:service=DataNode,name=FSDatasetState-<storageid>"
|
||||
* "hadoop:service=DataNode,name=FSDatasetState-<datanodeUuid>"
|
||||
*/
|
||||
void registerMBean(final String storageId) {
|
||||
void registerMBean(final String datanodeUuid) {
|
||||
// We wrap to bypass standard mbean naming convetion.
|
||||
// This wraping can be removed in java 6 as it is more flexible in
|
||||
// package naming for mbeans and their impl.
|
||||
StandardMBean bean;
|
||||
String storageName;
|
||||
if (storageId == null || storageId.equals("")) {// Temp fix for the uninitialized storage
|
||||
storageName = "UndefinedStorageId" + DFSUtil.getRandom().nextInt();
|
||||
} else {
|
||||
storageName = storageId;
|
||||
}
|
||||
try {
|
||||
bean = new StandardMBean(this,FSDatasetMBean.class);
|
||||
mbeanName = MBeans.register("DataNode", "FSDatasetState-" + storageName, bean);
|
||||
StandardMBean bean = new StandardMBean(this,FSDatasetMBean.class);
|
||||
mbeanName = MBeans.register("DataNode", "FSDatasetState-" + datanodeUuid, bean);
|
||||
} catch (NotCompliantMBeanException e) {
|
||||
LOG.warn("Error registering FSDatasetState MBean", e);
|
||||
}
|
||||
|
@ -1343,6 +1369,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
// Block is in memory and not on the disk
|
||||
// Remove the block from volumeMap
|
||||
volumeMap.remove(bpid, blockId);
|
||||
perVolumeReplicaMap.get((FsVolumeImpl) memBlockInfo.getVolume())
|
||||
.remove(bpid, blockId);
|
||||
final DataBlockScanner blockScanner = datanode.getBlockScanner();
|
||||
if (blockScanner != null) {
|
||||
blockScanner.deleteBlock(bpid, new Block(blockId));
|
||||
|
@ -1366,6 +1394,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
|
||||
diskFile.length(), diskGS, vol, diskFile.getParentFile());
|
||||
volumeMap.add(bpid, diskBlockInfo);
|
||||
perVolumeReplicaMap.get((FsVolumeImpl) memBlockInfo.getVolume()).
|
||||
remove(bpid, diskBlockInfo);
|
||||
final DataBlockScanner blockScanner = datanode.getBlockScanner();
|
||||
if (blockScanner != null) {
|
||||
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
|
||||
|
@ -1639,7 +1669,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
LOG.info("Adding block pool " + bpid);
|
||||
volumes.addBlockPool(bpid, conf);
|
||||
volumeMap.initBlockPool(bpid);
|
||||
volumes.getVolumeMap(bpid, volumeMap);
|
||||
volumes.getAllVolumesMap(bpid, volumeMap);
|
||||
|
||||
// TODO: Avoid the double scan.
|
||||
for (FsVolumeImpl v : getVolumes()) {
|
||||
ReplicaMap rMap = perVolumeReplicaMap.get(v);
|
||||
rMap.initBlockPool(bpid);
|
||||
volumes.getVolumeMap(bpid, v, rMap);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1649,11 +1686,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
volumes.removeBlockPool(bpid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getBlockPoolList() {
|
||||
return volumeMap.getBlockPoolList();
|
||||
}
|
||||
|
||||
/**
|
||||
* Class for representing the Datanode volume information
|
||||
*/
|
||||
|
|
|
@ -290,7 +290,7 @@ class FsVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
}
|
||||
|
||||
String getStorageID() {
|
||||
public String getStorageID() {
|
||||
return storageID;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,10 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
@ -93,27 +90,37 @@ class FsVolumeList {
|
|||
return remaining;
|
||||
}
|
||||
|
||||
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
|
||||
void initializeReplicaMaps(Map<FsVolumeImpl, ReplicaMap> perVolumeReplicaMap,
|
||||
ReplicaMap globalReplicaMap,
|
||||
Object mutex) throws IOException {
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
v.getVolumeMap(volumeMap);
|
||||
ReplicaMap rMap = new ReplicaMap(mutex);
|
||||
v.getVolumeMap(rMap);
|
||||
perVolumeReplicaMap.put(v, rMap);
|
||||
globalReplicaMap.addAll(rMap);
|
||||
}
|
||||
}
|
||||
|
||||
void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
|
||||
void getAllVolumesMap(String bpid, ReplicaMap volumeMap) throws IOException {
|
||||
long totalStartTime = System.currentTimeMillis();
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
|
||||
" on volume " + v + "...");
|
||||
long startTime = System.currentTimeMillis();
|
||||
v.getVolumeMap(bpid, volumeMap);
|
||||
long timeTaken = System.currentTimeMillis() - startTime;
|
||||
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
|
||||
" on volume " + v + ": " + timeTaken + "ms");
|
||||
getVolumeMap(bpid, v, volumeMap);
|
||||
}
|
||||
long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
|
||||
FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
|
||||
+ totalTimeTaken + "ms");
|
||||
}
|
||||
|
||||
void getVolumeMap(String bpid, FsVolumeImpl volume, ReplicaMap volumeMap)
|
||||
throws IOException {
|
||||
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
|
||||
" on volume " + volume + "...");
|
||||
long startTime = System.currentTimeMillis();
|
||||
volume.getVolumeMap(bpid, volumeMap);
|
||||
long timeTaken = System.currentTimeMillis() - startTime;
|
||||
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
|
||||
" on volume " + volume + ": " + timeTaken + "ms");
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
|
||||
|
|
|
@ -117,6 +117,15 @@ class ReplicaMap {
|
|||
return m.put(replicaInfo.getBlockId(), replicaInfo);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add all entries from the given replica map into the local replica
|
||||
* map.
|
||||
* @param
|
||||
*/
|
||||
void addAll(ReplicaMap other) {
|
||||
map.putAll(other.map);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the replica's meta information from the map that matches
|
||||
|
|
|
@ -289,6 +289,7 @@ public class JsonUtil {
|
|||
return null;
|
||||
}
|
||||
|
||||
// TODO: Fix storageID
|
||||
final Map<String, Object> m = new TreeMap<String, Object>();
|
||||
m.put("ipAddr", datanodeinfo.getIpAddr());
|
||||
m.put("hostName", datanodeinfo.getHostName());
|
||||
|
@ -314,6 +315,7 @@ public class JsonUtil {
|
|||
return null;
|
||||
}
|
||||
|
||||
// TODO: Fix storageID
|
||||
return new DatanodeInfo(
|
||||
(String)m.get("ipAddr"),
|
||||
(String)m.get("hostName"),
|
||||
|
|
|
@ -41,13 +41,7 @@ import java.net.Socket;
|
|||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -805,7 +799,8 @@ public class DFSTestUtil {
|
|||
}
|
||||
|
||||
private static DatanodeID getDatanodeID(String ipAddr) {
|
||||
return new DatanodeID(ipAddr, "localhost", "",
|
||||
return new DatanodeID(ipAddr, "localhost",
|
||||
UUID.randomUUID().toString(),
|
||||
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
|
||||
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
|
||||
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
|
||||
|
@ -816,8 +811,8 @@ public class DFSTestUtil {
|
|||
}
|
||||
|
||||
public static DatanodeID getLocalDatanodeID(int port) {
|
||||
return new DatanodeID("127.0.0.1", "localhost", "",
|
||||
port, port, port);
|
||||
return new DatanodeID("127.0.0.1", "localhost",
|
||||
UUID.randomUUID().toString(), port, port, port);
|
||||
}
|
||||
|
||||
public static DatanodeDescriptor getLocalDatanodeDescriptor() {
|
||||
|
@ -838,8 +833,9 @@ public class DFSTestUtil {
|
|||
|
||||
public static DatanodeInfo getDatanodeInfo(String ipAddr,
|
||||
String host, int port) {
|
||||
return new DatanodeInfo(new DatanodeID(ipAddr, host, "",
|
||||
port, DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
|
||||
return new DatanodeInfo(new DatanodeID(ipAddr, host,
|
||||
UUID.randomUUID().toString(), port,
|
||||
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
|
||||
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT));
|
||||
}
|
||||
|
||||
|
@ -893,7 +889,8 @@ public class DFSTestUtil {
|
|||
|
||||
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
|
||||
int port, String rackLocation) {
|
||||
DatanodeID dnId = new DatanodeID(ipAddr, "host", "", port,
|
||||
DatanodeID dnId = new DatanodeID(ipAddr, "host",
|
||||
UUID.randomUUID().toString(), port,
|
||||
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
|
||||
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
|
||||
return new DatanodeDescriptor(dnId, rackLocation);
|
||||
|
|
|
@ -172,7 +172,7 @@ public class TestDatanodeRegistration {
|
|||
|
||||
// register a datanode
|
||||
DatanodeID dnId = new DatanodeID(DN_IP_ADDR, DN_HOSTNAME,
|
||||
"fake-storage-id", DN_XFER_PORT, DN_INFO_PORT, DN_IPC_PORT);
|
||||
"fake-datanode-id", DN_XFER_PORT, DN_INFO_PORT, DN_IPC_PORT);
|
||||
long nnCTime = cluster.getNamesystem().getFSImage().getStorage()
|
||||
.getCTime();
|
||||
StorageInfo mockStorageInfo = mock(StorageInfo.class);
|
||||
|
@ -188,7 +188,7 @@ public class TestDatanodeRegistration {
|
|||
|
||||
// register the same datanode again with a different storage ID
|
||||
dnId = new DatanodeID(DN_IP_ADDR, DN_HOSTNAME,
|
||||
"changed-fake-storage-id", DN_XFER_PORT, DN_INFO_PORT, DN_IPC_PORT);
|
||||
"changed-fake-datanode-id", DN_XFER_PORT, DN_INFO_PORT, DN_IPC_PORT);
|
||||
dnReg = new DatanodeRegistration(dnId,
|
||||
mockStorageInfo, null, VersionInfo.getVersion());
|
||||
rpcServer.registerDatanode(dnReg);
|
||||
|
|
|
@ -149,7 +149,7 @@ public class TestPeerCache {
|
|||
public void testAddAndRetrieve() throws Exception {
|
||||
PeerCache cache = new PeerCache(3, 100000);
|
||||
DatanodeID dnId = new DatanodeID("192.168.0.1",
|
||||
"fakehostname", "fake_storage_id",
|
||||
"fakehostname", "fake_datanode_id",
|
||||
100, 101, 102);
|
||||
FakePeer peer = new FakePeer(dnId, false);
|
||||
cache.put(dnId, peer);
|
||||
|
@ -169,7 +169,7 @@ public class TestPeerCache {
|
|||
FakePeer peers[] = new FakePeer[CAPACITY];
|
||||
for (int i = 0; i < CAPACITY; ++i) {
|
||||
dnIds[i] = new DatanodeID("192.168.0.1",
|
||||
"fakehostname_" + i, "fake_storage_id",
|
||||
"fakehostname_" + i, "fake_datanode_id",
|
||||
100, 101, 102);
|
||||
peers[i] = new FakePeer(dnIds[i], false);
|
||||
}
|
||||
|
@ -200,7 +200,7 @@ public class TestPeerCache {
|
|||
FakePeer peers[] = new FakePeer[CAPACITY + 1];
|
||||
for (int i = 0; i < dnIds.length; ++i) {
|
||||
dnIds[i] = new DatanodeID("192.168.0.1",
|
||||
"fakehostname_" + i, "fake_storage_id_" + i,
|
||||
"fakehostname_" + i, "fake_datanode_id_" + i,
|
||||
100, 101, 102);
|
||||
peers[i] = new FakePeer(dnIds[i], false);
|
||||
}
|
||||
|
@ -231,7 +231,7 @@ public class TestPeerCache {
|
|||
final int CAPACITY = 3;
|
||||
PeerCache cache = new PeerCache(CAPACITY, 100000);
|
||||
DatanodeID dnId = new DatanodeID("192.168.0.1",
|
||||
"fakehostname", "fake_storage_id",
|
||||
"fakehostname", "fake_datanode_id",
|
||||
100, 101, 102);
|
||||
HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
|
||||
for (int i = 0; i < CAPACITY; ++i) {
|
||||
|
@ -256,7 +256,7 @@ public class TestPeerCache {
|
|||
final int CAPACITY = 3;
|
||||
PeerCache cache = new PeerCache(CAPACITY, 100000);
|
||||
DatanodeID dnId = new DatanodeID("192.168.0.1",
|
||||
"fakehostname", "fake_storage_id",
|
||||
"fakehostname", "fake_datanode_id",
|
||||
100, 101, 102);
|
||||
HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
|
||||
for (int i = 0; i < CAPACITY; ++i) {
|
||||
|
|
|
@ -453,7 +453,7 @@ public class UpgradeUtilities {
|
|||
*/
|
||||
public static void createDataNodeVersionFile(File[] parent,
|
||||
StorageInfo version, String bpid, String bpidToWrite) throws IOException {
|
||||
DataStorage storage = new DataStorage(version, "doNotCare");
|
||||
DataStorage storage = new DataStorage(version);
|
||||
|
||||
File[] versionFiles = new File[parent.length];
|
||||
for (int i = 0; i < parent.length; i++) {
|
||||
|
|
|
@ -527,6 +527,8 @@ public class TestBlockManager {
|
|||
public void testSafeModeIBR() throws Exception {
|
||||
DatanodeDescriptor node = spy(nodes.get(0));
|
||||
DatanodeStorageInfo ds = node.getStorageInfos()[0];
|
||||
|
||||
// TODO: Needs to be fixed. DatanodeUuid is not storageID.
|
||||
node.setDatanodeUuid(ds.getStorageID());
|
||||
|
||||
node.isAlive = true;
|
||||
|
@ -571,7 +573,10 @@ public class TestBlockManager {
|
|||
public void testSafeModeIBRAfterIncremental() throws Exception {
|
||||
DatanodeDescriptor node = spy(nodes.get(0));
|
||||
DatanodeStorageInfo ds = node.getStorageInfos()[0];
|
||||
|
||||
// TODO: Needs to be fixed. DatanodeUuid is not storageID.
|
||||
node.setDatanodeUuid(ds.getStorageID());
|
||||
|
||||
node.isAlive = true;
|
||||
|
||||
DatanodeRegistration nodeReg =
|
||||
|
|
|
@ -445,9 +445,9 @@ public class TestJspHelper {
|
|||
|
||||
@Test
|
||||
public void testSortNodeByFields() throws Exception {
|
||||
DatanodeID dnId1 = new DatanodeID("127.0.0.1", "localhost1", "storage1",
|
||||
DatanodeID dnId1 = new DatanodeID("127.0.0.1", "localhost1", "datanode1",
|
||||
1234, 2345, 3456);
|
||||
DatanodeID dnId2 = new DatanodeID("127.0.0.2", "localhost2", "storage2",
|
||||
DatanodeID dnId2 = new DatanodeID("127.0.0.2", "localhost2", "datanode2",
|
||||
1235, 2346, 3457);
|
||||
DatanodeDescriptor dnDesc1 = new DatanodeDescriptor(dnId1, "rack1", 1024,
|
||||
100, 924, 100, 10, 2);
|
||||
|
|
|
@ -383,8 +383,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
|
||||
public SimulatedFSDataset(DataStorage storage, Configuration conf) {
|
||||
if (storage != null) {
|
||||
storage.createStorageID();
|
||||
this.datanodeUuid = storage.getStorageID();
|
||||
for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
|
||||
storage.createStorageID(storage.getStorageDir(i));
|
||||
}
|
||||
this.datanodeUuid = storage.getDatanodeUuid();
|
||||
} else {
|
||||
this.datanodeUuid = "unknownStorageId-" + UUID.randomUUID();
|
||||
}
|
||||
|
@ -459,6 +461,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
return new BlockListAsLongs(blocks, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Map<String, BlockListAsLongs> getBlockReports(
|
||||
String bpid) {
|
||||
Map<String, BlockListAsLongs> reports =
|
||||
new HashMap<String, BlockListAsLongs>();
|
||||
reports.put("", getBlockReport(bpid));
|
||||
return reports;
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getCapacity() {
|
||||
return storage.getCapacity();
|
||||
|
@ -910,6 +921,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
|
||||
long recoveryId,
|
||||
long newlength) {
|
||||
// Caller does not care about the exact Storage UUID returned.
|
||||
return datanodeUuid;
|
||||
}
|
||||
|
||||
|
@ -964,11 +976,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getBlockPoolList() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||
File diskMetaFile, FsVolumeSpi vol) {
|
||||
|
|
|
@ -368,7 +368,7 @@ public class TestBlockReport {
|
|||
int randIndex = rand.nextInt(blocks.size());
|
||||
// Get a block and screw its GS
|
||||
Block corruptedBlock = blocks.get(randIndex);
|
||||
String secondNode = cluster.getDataNodes().get(DN_N1).getDatanodeUuid();
|
||||
String secondNode = cluster.getDataNodes().get(DN_N1).getDatanodeId().getDatanodeUuid();
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Working with " + secondNode);
|
||||
LOG.debug("BlockGS before " + blocks.get(randIndex).getGenerationStamp());
|
||||
|
|
|
@ -840,9 +840,9 @@ public class NNThroughputBenchmark implements Tool {
|
|||
"", getNodePort(dnIdx),
|
||||
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
|
||||
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT),
|
||||
new DataStorage(nsInfo, ""),
|
||||
new DataStorage(nsInfo),
|
||||
new ExportedBlockKeys(), VersionInfo.getVersion());
|
||||
DataNode.setNewStorageID(dnRegistration);
|
||||
// TODO: Fix NNThroughputBenchmark.
|
||||
// register datanode
|
||||
dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
|
||||
//first block reports
|
||||
|
@ -939,7 +939,7 @@ public class NNThroughputBenchmark implements Tool {
|
|||
DatanodeInfo dnInfo = blockTargets[t];
|
||||
DatanodeRegistration receivedDNReg;
|
||||
receivedDNReg = new DatanodeRegistration(dnInfo,
|
||||
new DataStorage(nsInfo, dnInfo.getDatanodeUuid()),
|
||||
new DataStorage(nsInfo),
|
||||
new ExportedBlockKeys(), VersionInfo.getVersion());
|
||||
ReceivedDeletedBlockInfo[] rdBlocks = {
|
||||
new ReceivedDeletedBlockInfo(
|
||||
|
|
Loading…
Reference in New Issue