HDFS-6899: Merging r1619970 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1619975 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-08-23 06:36:46 +00:00
parent 1846514dd2
commit 9d09b549c5
8 changed files with 171 additions and 36 deletions

View File

@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6758. block writer should pass the expected block size to HDFS-6758. block writer should pass the expected block size to
DataXceiverServer. (Arpit Agarwal) DataXceiverServer. (Arpit Agarwal)
HDFS-6899. Allow changing MiniDFSCluster volumes per DN and capacity
per volume. (Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -29,6 +29,7 @@
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
@ -49,7 +50,8 @@
* It uses the {@link FsDatasetImpl} object for synchronization. * It uses the {@link FsDatasetImpl} object for synchronization.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class FsVolumeImpl implements FsVolumeSpi { @VisibleForTesting
public class FsVolumeImpl implements FsVolumeSpi {
private final FsDatasetImpl dataset; private final FsDatasetImpl dataset;
private final String storageID; private final String storageID;
private final StorageType storageType; private final StorageType storageType;
@ -58,6 +60,12 @@ class FsVolumeImpl implements FsVolumeSpi {
private final File currentDir; // <StorageDirectory>/current private final File currentDir; // <StorageDirectory>/current
private final DF usage; private final DF usage;
private final long reserved; private final long reserved;
// Capacity configured. This is useful when we want to
// limit the visible capacity for tests. If negative, then we just
// query from the filesystem.
protected long configuredCapacity;
/** /**
* Per-volume worker pool that processes new blocks to cache. * Per-volume worker pool that processes new blocks to cache.
* The maximum number of workers per volume is bounded (configurable via * The maximum number of workers per volume is bounded (configurable via
@ -77,20 +85,26 @@ class FsVolumeImpl implements FsVolumeSpi {
File parent = currentDir.getParentFile(); File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf); this.usage = new DF(parent, conf);
this.storageType = storageType; this.storageType = storageType;
this.configuredCapacity = -1;
cacheExecutor = initializeCacheExecutor(parent);
}
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
final int maxNumThreads = dataset.datanode.getConf().getInt( final int maxNumThreads = dataset.datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT);
);
ThreadFactory workerFactory = new ThreadFactoryBuilder() ThreadFactory workerFactory = new ThreadFactoryBuilder()
.setDaemon(true) .setDaemon(true)
.setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d") .setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
.build(); .build();
cacheExecutor = new ThreadPoolExecutor( ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, maxNumThreads, 1, maxNumThreads,
60, TimeUnit.SECONDS, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new LinkedBlockingQueue<Runnable>(),
workerFactory); workerFactory);
cacheExecutor.allowCoreThreadTimeOut(true); executor.allowCoreThreadTimeOut(true);
return executor;
} }
File getCurrentDir() { File getCurrentDir() {
@ -129,9 +143,24 @@ long getBlockPoolUsed(String bpid) throws IOException {
* reserved capacity. * reserved capacity.
* @return the unreserved number of bytes left in this filesystem. May be zero. * @return the unreserved number of bytes left in this filesystem. May be zero.
*/ */
long getCapacity() { @VisibleForTesting
long remaining = usage.getCapacity() - reserved; public long getCapacity() {
return remaining > 0 ? remaining : 0; if (configuredCapacity < 0) {
long remaining = usage.getCapacity() - reserved;
return remaining > 0 ? remaining : 0;
}
return configuredCapacity;
}
/**
* This function MUST NOT be used outside of tests.
*
* @param capacity
*/
@VisibleForTesting
public void setCapacityForTesting(long capacity) {
this.configuredCapacity = capacity;
} }
@Override @Override

View File

@ -91,7 +91,9 @@
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@ -132,11 +134,15 @@ public class MiniDFSCluster {
public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing"; = DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
// Changing this value may break some tests that assume it is 2. // Changing this default may break some tests that assume it is 2.
public static final int DIRS_PER_DATANODE = 2; private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
static { DefaultMetricsSystem.setMiniClusterMode(true); } static { DefaultMetricsSystem.setMiniClusterMode(true); }
public int getStoragesPerDatanode() {
return storagesPerDatanode;
}
/** /**
* Class to construct instances of MiniDFSClusters with specific options. * Class to construct instances of MiniDFSClusters with specific options.
*/ */
@ -146,6 +152,8 @@ public static class Builder {
private final Configuration conf; private final Configuration conf;
private int numDataNodes = 1; private int numDataNodes = 1;
private StorageType[][] storageTypes = null; private StorageType[][] storageTypes = null;
private StorageType[] storageTypes1D = null;
private int storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
private boolean format = true; private boolean format = true;
private boolean manageNameDfsDirs = true; private boolean manageNameDfsDirs = true;
private boolean manageNameDfsSharedDirs = true; private boolean manageNameDfsSharedDirs = true;
@ -156,6 +164,8 @@ public static class Builder {
private String[] racks = null; private String[] racks = null;
private String [] hosts = null; private String [] hosts = null;
private long [] simulatedCapacities = null; private long [] simulatedCapacities = null;
private long [][] storageCapacities = null;
private long [] storageCapacities1D = null;
private String clusterId = null; private String clusterId = null;
private boolean waitSafeMode = true; private boolean waitSafeMode = true;
private boolean setupHostsFile = false; private boolean setupHostsFile = false;
@ -193,17 +203,21 @@ public Builder numDataNodes(int val) {
return this; return this;
} }
/**
* Default: DEFAULT_STORAGES_PER_DATANODE
*/
public Builder storagesPerDatanode(int numStorages) {
this.storagesPerDatanode = numStorages;
return this;
}
/** /**
* Set the same storage type configuration for each datanode. * Set the same storage type configuration for each datanode.
* If storageTypes is uninitialized or passed null then * If storageTypes is uninitialized or passed null then
* StorageType.DEFAULT is used. * StorageType.DEFAULT is used.
*/ */
public Builder storageTypes(StorageType[] types) { public Builder storageTypes(StorageType[] types) {
assert types.length == DIRS_PER_DATANODE; this.storageTypes1D = types;
this.storageTypes = new StorageType[numDataNodes][types.length];
for (int i = 0; i < numDataNodes; ++i) {
this.storageTypes[i] = types;
}
return this; return this;
} }
@ -217,6 +231,26 @@ public Builder storageTypes(StorageType[][] types) {
return this; return this;
} }
/**
* Set the same storage capacity configuration for each datanode.
* If storageTypes is uninitialized or passed null then
* StorageType.DEFAULT is used.
*/
public Builder storageCapacities(long[] capacities) {
this.storageCapacities1D = capacities;
return this;
}
/**
* Set custom storage capacity configuration for each datanode.
* If storageCapacities is uninitialized or passed null then
* capacity is limited by available disk space.
*/
public Builder storageCapacities(long[][] capacities) {
this.storageCapacities = capacities;
return this;
}
/** /**
* Default: true * Default: true
*/ */
@ -290,6 +324,11 @@ public Builder hosts(String[] val) {
} }
/** /**
* Use SimulatedFSDataset and limit the capacity of each DN per
* the values passed in val.
*
* For limiting the capacity of volumes with real storage, see
* {@link FsVolumeImpl#setCapacityForTesting}
* Default: null * Default: null
*/ */
public Builder simulatedCapacities(long[] val) { public Builder simulatedCapacities(long[] val) {
@ -392,7 +431,28 @@ protected MiniDFSCluster(Builder builder) throws IOException {
LOG.info("starting cluster: numNameNodes=" + numNameNodes LOG.info("starting cluster: numNameNodes=" + numNameNodes
+ ", numDataNodes=" + builder.numDataNodes); + ", numDataNodes=" + builder.numDataNodes);
nameNodes = new NameNodeInfo[numNameNodes]; nameNodes = new NameNodeInfo[numNameNodes];
this.storagesPerDatanode = builder.storagesPerDatanode;
// Duplicate the storageType setting for each DN.
if (builder.storageTypes == null && builder.storageTypes1D != null) {
assert builder.storageTypes1D.length == storagesPerDatanode;
builder.storageTypes = new StorageType[builder.numDataNodes][storagesPerDatanode];
for (int i = 0; i < builder.numDataNodes; ++i) {
builder.storageTypes[i] = builder.storageTypes1D;
}
}
// Duplicate the storageCapacity setting for each DN.
if (builder.storageCapacities == null && builder.storageCapacities1D != null) {
assert builder.storageCapacities1D.length == storagesPerDatanode;
builder.storageCapacities = new long[builder.numDataNodes][storagesPerDatanode];
for (int i = 0; i < builder.numDataNodes; ++i) {
builder.storageCapacities[i] = builder.storageCapacities1D;
}
}
initMiniDFSCluster(builder.conf, initMiniDFSCluster(builder.conf,
builder.numDataNodes, builder.numDataNodes,
builder.storageTypes, builder.storageTypes,
@ -405,6 +465,7 @@ protected MiniDFSCluster(Builder builder) throws IOException {
builder.dnOption, builder.dnOption,
builder.racks, builder.racks,
builder.hosts, builder.hosts,
builder.storageCapacities,
builder.simulatedCapacities, builder.simulatedCapacities,
builder.clusterId, builder.clusterId,
builder.waitSafeMode, builder.waitSafeMode,
@ -447,6 +508,7 @@ public void setDnArgs(String ... args) {
private boolean waitSafeMode = true; private boolean waitSafeMode = true;
private boolean federation; private boolean federation;
private boolean checkExitOnShutdown = true; private boolean checkExitOnShutdown = true;
protected final int storagesPerDatanode;
/** /**
* A unique instance identifier for the cluster. This * A unique instance identifier for the cluster. This
@ -485,6 +547,7 @@ public void setStartOpt(StartupOption startOpt) {
*/ */
public MiniDFSCluster() { public MiniDFSCluster() {
nameNodes = new NameNodeInfo[0]; // No namenode in the cluster nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
synchronized (MiniDFSCluster.class) { synchronized (MiniDFSCluster.class) {
instanceId = instanceCount++; instanceId = instanceCount++;
} }
@ -659,11 +722,12 @@ public MiniDFSCluster(int nameNodePort,
String[] racks, String hosts[], String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException { long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
this.storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
initMiniDFSCluster(conf, numDataNodes, null, format, initMiniDFSCluster(conf, numDataNodes, null, format,
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs, manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
operation, null, racks, hosts, operation, null, racks, hosts,
simulatedCapacities, null, true, false, null, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null); MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
} }
private void initMiniDFSCluster( private void initMiniDFSCluster(
@ -672,7 +736,8 @@ private void initMiniDFSCluster(
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy, boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
boolean manageDataDfsDirs, StartupOption startOpt, boolean manageDataDfsDirs, StartupOption startOpt,
StartupOption dnStartOpt, String[] racks, StartupOption dnStartOpt, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId, String[] hosts,
long[][] storageCapacities, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile, boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown, MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
boolean checkDataNodeAddrConfig, boolean checkDataNodeAddrConfig,
@ -746,7 +811,7 @@ private void initMiniDFSCluster(
// Start the DataNodes // Start the DataNodes
startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs, startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs,
dnStartOpt != null ? dnStartOpt : startOpt, dnStartOpt != null ? dnStartOpt : startOpt,
racks, hosts, simulatedCapacities, setupHostsFile, racks, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays); checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
waitClusterUp(); waitClusterUp();
//make sure ProxyUsers uses the latest conf //make sure ProxyUsers uses the latest conf
@ -1121,8 +1186,8 @@ public void waitClusterUp() throws IOException {
String makeDataNodeDirs(int dnIndex, StorageType[] storageTypes) throws IOException { String makeDataNodeDirs(int dnIndex, StorageType[] storageTypes) throws IOException {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
assert storageTypes == null || storageTypes.length == DIRS_PER_DATANODE; assert storageTypes == null || storageTypes.length == storagesPerDatanode;
for (int j = 0; j < DIRS_PER_DATANODE; ++j) { for (int j = 0; j < storagesPerDatanode; ++j) {
File dir = getInstanceStorageDir(dnIndex, j); File dir = getInstanceStorageDir(dnIndex, j);
dir.mkdirs(); dir.mkdirs();
if (!dir.isDirectory()) { if (!dir.isDirectory()) {
@ -1198,7 +1263,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile) throws IOException { boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts, startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, false, false, null); null, simulatedCapacities, setupHostsFile, false, false, null);
} }
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
@ -1208,7 +1273,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException { boolean checkDataNodeAddrConfig) throws IOException {
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts, startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null); null, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
} }
/** /**
@ -1242,12 +1307,15 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation, StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts, String[] racks, String[] hosts,
long[][] storageCapacities,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig, boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig, boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException { Configuration[] dnConfOverlays) throws IOException {
assert storageCapacities == null || simulatedCapacities == null;
assert storageTypes == null || storageTypes.length == numDataNodes; assert storageTypes == null || storageTypes.length == numDataNodes;
assert storageCapacities == null || storageCapacities.length == numDataNodes;
if (operation == StartupOption.RECOVER) { if (operation == StartupOption.RECOVER) {
return; return;
@ -1300,7 +1368,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
operation != StartupOption.ROLLBACK) ? operation != StartupOption.ROLLBACK) ?
null : new String[] {operation.getName()}; null : new String[] {operation.getName()};
DataNode[] dns = new DataNode[numDataNodes];
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf); Configuration dnConf = new HdfsConfiguration(conf);
if (dnConfOverlays != null) { if (dnConfOverlays != null) {
@ -1391,10 +1459,24 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
dn.runDatanodeDaemon(); dn.runDatanodeDaemon();
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs,
secureResources, dn.getIpcPort())); secureResources, dn.getIpcPort()));
dns[i - curDatanodesNum] = dn;
} }
curDatanodesNum += numDataNodes; curDatanodesNum += numDataNodes;
this.numDataNodes += numDataNodes; this.numDataNodes += numDataNodes;
waitActive(); waitActive();
if (storageCapacities != null) {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes();
assert storageCapacities[i].length == storagesPerDatanode;
assert volumes.size() == storagesPerDatanode;
for (int j = 0; j < volumes.size(); ++j) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
volume.setCapacityForTesting(storageCapacities[i][j]);
}
}
}
} }

View File

@ -22,6 +22,7 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,6 +32,8 @@
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -52,11 +55,15 @@ public static void setNodeGroups (String[] nodeGroups) {
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation, StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] nodeGroups, String[] hosts, String[] racks, String[] nodeGroups, String[] hosts,
long[][] storageCapacities,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig, boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException { boolean checkDataNodeHostConfig) throws IOException {
assert storageCapacities == null || simulatedCapacities == null;
assert storageTypes == null || storageTypes.length == numDataNodes; assert storageTypes == null || storageTypes.length == numDataNodes;
assert storageCapacities == null || storageCapacities.length == numDataNodes;
if (operation == StartupOption.RECOVER) { if (operation == StartupOption.RECOVER) {
return; return;
@ -109,6 +116,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
operation != StartupOption.ROLLBACK) ? operation != StartupOption.ROLLBACK) ?
null : new String[] {operation.getName()}; null : new String[] {operation.getName()};
DataNode[] dns = new DataNode[numDataNodes];
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf); Configuration dnConf = new HdfsConfiguration(conf);
// Set up datanode address // Set up datanode address
@ -181,10 +189,23 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
} }
dn.runDatanodeDaemon(); dn.runDatanodeDaemon();
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources, dn.getIpcPort())); dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources, dn.getIpcPort()));
dns[i - curDatanodesNum] = dn;
} }
curDatanodesNum += numDataNodes; curDatanodesNum += numDataNodes;
this.numDataNodes += numDataNodes; this.numDataNodes += numDataNodes;
waitActive(); waitActive();
if (storageCapacities != null) {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes();
assert volumes.size() == storagesPerDatanode;
for (int j = 0; j < volumes.size(); ++j) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
volume.setCapacityForTesting(storageCapacities[i][j]);
}
}
}
} }
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
@ -193,7 +214,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile) throws IOException { boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, nodeGroups, startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, nodeGroups,
hosts, simulatedCapacities, setupHostsFile, false, false); hosts, null, simulatedCapacities, setupHostsFile, false, false);
} }
public void startDataNodes(Configuration conf, int numDataNodes, public void startDataNodes(Configuration conf, int numDataNodes,
@ -209,13 +230,14 @@ public void startDataNodes(Configuration conf, int numDataNodes,
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation, StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts, String[] racks, String[] hosts,
long[][] storageCapacities,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig, boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig, boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException { Configuration[] dnConfOverlays) throws IOException {
startDataNodes(conf, numDataNodes, storageTypes, manageDfsDirs, operation, racks, startDataNodes(conf, numDataNodes, storageTypes, manageDfsDirs, operation, racks,
NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile, NODE_GROUPS, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig); checkDataNodeAddrConfig, checkDataNodeHostConfig);
} }

View File

@ -213,7 +213,7 @@ public void testInitializeReplQueuesEarly() throws Exception {
@Override @Override
public Boolean get() { public Boolean get() {
return getLongCounter("StorageBlockReportOps", getMetrics(NN_METRICS)) == return getLongCounter("StorageBlockReportOps", getMetrics(NN_METRICS)) ==
MiniDFSCluster.DIRS_PER_DATANODE; cluster.getStoragesPerDatanode();
} }
}, 10, 10000); }, 10, 10000);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -106,7 +105,7 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
DatanodeRegistration dnReg = dn.getDNRegistrationForBP(bpid); DatanodeRegistration dnReg = dn.getDNRegistrationForBP(bpid);
StorageBlockReport reports[] = StorageBlockReport reports[] =
new StorageBlockReport[MiniDFSCluster.DIRS_PER_DATANODE]; new StorageBlockReport[cluster.getStoragesPerDatanode()];
ArrayList<Block> blocks = new ArrayList<Block>(); ArrayList<Block> blocks = new ArrayList<Block>();
@ -114,7 +113,7 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
blocks.add(locatedBlock.getBlock().getLocalBlock()); blocks.add(locatedBlock.getBlock().getLocalBlock());
} }
for (int i = 0; i < MiniDFSCluster.DIRS_PER_DATANODE; ++i) { for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
BlockListAsLongs bll = new BlockListAsLongs(blocks, null); BlockListAsLongs bll = new BlockListAsLongs(blocks, null);
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i); FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
DatanodeStorage dns = new DatanodeStorage(v.getStorageID()); DatanodeStorage dns = new DatanodeStorage(v.getStorageID());

View File

@ -130,7 +130,7 @@ public void testAlwaysSplit() throws IOException, InterruptedException {
ArgumentCaptor<StorageBlockReport[]> captor = ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class); ArgumentCaptor.forClass(StorageBlockReport[].class);
Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport( Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class), any(DatanodeRegistration.class),
anyString(), anyString(),
captor.capture()); captor.capture());
@ -167,7 +167,7 @@ public void testCornerCaseUnderThreshold() throws IOException, InterruptedExcept
anyString(), anyString(),
captor.capture()); captor.capture());
verifyCapturedArguments(captor, MiniDFSCluster.DIRS_PER_DATANODE, BLOCKS_IN_FILE); verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
} }
/** /**
@ -194,7 +194,7 @@ public void testCornerCaseAtThreshold() throws IOException, InterruptedException
ArgumentCaptor<StorageBlockReport[]> captor = ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class); ArgumentCaptor.forClass(StorageBlockReport[].class);
Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport( Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class), any(DatanodeRegistration.class),
anyString(), anyString(),
captor.capture()); captor.capture());

View File

@ -444,7 +444,7 @@ public void testSyncAndBlockReportMetric() throws Exception {
assertCounter("SyncsNumOps", 1L, rb); assertCounter("SyncsNumOps", 1L, rb);
// Each datanode reports in when the cluster comes up // Each datanode reports in when the cluster comes up
assertCounter("BlockReportNumOps", assertCounter("BlockReportNumOps",
(long)DATANODE_COUNT*MiniDFSCluster.DIRS_PER_DATANODE, rb); (long)DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb);
// Sleep for an interval+slop to let the percentiles rollover // Sleep for an interval+slop to let the percentiles rollover
Thread.sleep((PERCENTILES_INTERVAL+1)*1000); Thread.sleep((PERCENTILES_INTERVAL+1)*1000);