HDFS-6899: Merging r1619970 from trunk to branch-2.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1619973 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3031e29a90
commit
554476881f
|
@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-6758. block writer should pass the expected block size to
|
HDFS-6758. block writer should pass the expected block size to
|
||||||
DataXceiverServer. (Arpit Agarwal)
|
DataXceiverServer. (Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-6899. Allow changing MiniDFSCluster volumes per DN and capacity
|
||||||
|
per volume. (Arpit Agarwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.DF;
|
import org.apache.hadoop.fs.DF;
|
||||||
|
@ -49,7 +50,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
* It uses the {@link FsDatasetImpl} object for synchronization.
|
* It uses the {@link FsDatasetImpl} object for synchronization.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class FsVolumeImpl implements FsVolumeSpi {
|
@VisibleForTesting
|
||||||
|
public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
private final FsDatasetImpl dataset;
|
private final FsDatasetImpl dataset;
|
||||||
private final String storageID;
|
private final String storageID;
|
||||||
private final StorageType storageType;
|
private final StorageType storageType;
|
||||||
|
@ -58,6 +60,12 @@ class FsVolumeImpl implements FsVolumeSpi {
|
||||||
private final File currentDir; // <StorageDirectory>/current
|
private final File currentDir; // <StorageDirectory>/current
|
||||||
private final DF usage;
|
private final DF usage;
|
||||||
private final long reserved;
|
private final long reserved;
|
||||||
|
|
||||||
|
// Capacity configured. This is useful when we want to
|
||||||
|
// limit the visible capacity for tests. If negative, then we just
|
||||||
|
// query from the filesystem.
|
||||||
|
protected long configuredCapacity;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Per-volume worker pool that processes new blocks to cache.
|
* Per-volume worker pool that processes new blocks to cache.
|
||||||
* The maximum number of workers per volume is bounded (configurable via
|
* The maximum number of workers per volume is bounded (configurable via
|
||||||
|
@ -77,20 +85,26 @@ class FsVolumeImpl implements FsVolumeSpi {
|
||||||
File parent = currentDir.getParentFile();
|
File parent = currentDir.getParentFile();
|
||||||
this.usage = new DF(parent, conf);
|
this.usage = new DF(parent, conf);
|
||||||
this.storageType = storageType;
|
this.storageType = storageType;
|
||||||
|
this.configuredCapacity = -1;
|
||||||
|
cacheExecutor = initializeCacheExecutor(parent);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
|
||||||
final int maxNumThreads = dataset.datanode.getConf().getInt(
|
final int maxNumThreads = dataset.datanode.getConf().getInt(
|
||||||
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
|
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT
|
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT);
|
||||||
);
|
|
||||||
ThreadFactory workerFactory = new ThreadFactoryBuilder()
|
ThreadFactory workerFactory = new ThreadFactoryBuilder()
|
||||||
.setDaemon(true)
|
.setDaemon(true)
|
||||||
.setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
|
.setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
|
||||||
.build();
|
.build();
|
||||||
cacheExecutor = new ThreadPoolExecutor(
|
ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
||||||
1, maxNumThreads,
|
1, maxNumThreads,
|
||||||
60, TimeUnit.SECONDS,
|
60, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingQueue<Runnable>(),
|
new LinkedBlockingQueue<Runnable>(),
|
||||||
workerFactory);
|
workerFactory);
|
||||||
cacheExecutor.allowCoreThreadTimeOut(true);
|
executor.allowCoreThreadTimeOut(true);
|
||||||
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
File getCurrentDir() {
|
File getCurrentDir() {
|
||||||
|
@ -129,11 +143,26 @@ class FsVolumeImpl implements FsVolumeSpi {
|
||||||
* reserved capacity.
|
* reserved capacity.
|
||||||
* @return the unreserved number of bytes left in this filesystem. May be zero.
|
* @return the unreserved number of bytes left in this filesystem. May be zero.
|
||||||
*/
|
*/
|
||||||
long getCapacity() {
|
@VisibleForTesting
|
||||||
|
public long getCapacity() {
|
||||||
|
if (configuredCapacity < 0) {
|
||||||
long remaining = usage.getCapacity() - reserved;
|
long remaining = usage.getCapacity() - reserved;
|
||||||
return remaining > 0 ? remaining : 0;
|
return remaining > 0 ? remaining : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return configuredCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function MUST NOT be used outside of tests.
|
||||||
|
*
|
||||||
|
* @param capacity
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setCapacityForTesting(long capacity) {
|
||||||
|
this.configuredCapacity = capacity;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getAvailable() throws IOException {
|
public long getAvailable() throws IOException {
|
||||||
long remaining = getCapacity()-getDfsUsed();
|
long remaining = getCapacity()-getDfsUsed();
|
||||||
|
|
|
@ -55,7 +55,6 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.security.PrivilegedExceptionAction;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -91,7 +90,9 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
@ -132,11 +133,15 @@ public class MiniDFSCluster {
|
||||||
public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY
|
public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY
|
||||||
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
|
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
|
||||||
|
|
||||||
// Changing this value may break some tests that assume it is 2.
|
// Changing this default may break some tests that assume it is 2.
|
||||||
public static final int DIRS_PER_DATANODE = 2;
|
private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
|
||||||
|
|
||||||
static { DefaultMetricsSystem.setMiniClusterMode(true); }
|
static { DefaultMetricsSystem.setMiniClusterMode(true); }
|
||||||
|
|
||||||
|
public int getStoragesPerDatanode() {
|
||||||
|
return storagesPerDatanode;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to construct instances of MiniDFSClusters with specific options.
|
* Class to construct instances of MiniDFSClusters with specific options.
|
||||||
*/
|
*/
|
||||||
|
@ -146,6 +151,8 @@ public class MiniDFSCluster {
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private int numDataNodes = 1;
|
private int numDataNodes = 1;
|
||||||
private StorageType[][] storageTypes = null;
|
private StorageType[][] storageTypes = null;
|
||||||
|
private StorageType[] storageTypes1D = null;
|
||||||
|
private int storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
||||||
private boolean format = true;
|
private boolean format = true;
|
||||||
private boolean manageNameDfsDirs = true;
|
private boolean manageNameDfsDirs = true;
|
||||||
private boolean manageNameDfsSharedDirs = true;
|
private boolean manageNameDfsSharedDirs = true;
|
||||||
|
@ -156,6 +163,8 @@ public class MiniDFSCluster {
|
||||||
private String[] racks = null;
|
private String[] racks = null;
|
||||||
private String [] hosts = null;
|
private String [] hosts = null;
|
||||||
private long [] simulatedCapacities = null;
|
private long [] simulatedCapacities = null;
|
||||||
|
private long [][] storageCapacities = null;
|
||||||
|
private long [] storageCapacities1D = null;
|
||||||
private String clusterId = null;
|
private String clusterId = null;
|
||||||
private boolean waitSafeMode = true;
|
private boolean waitSafeMode = true;
|
||||||
private boolean setupHostsFile = false;
|
private boolean setupHostsFile = false;
|
||||||
|
@ -193,17 +202,21 @@ public class MiniDFSCluster {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default: DEFAULT_STORAGES_PER_DATANODE
|
||||||
|
*/
|
||||||
|
public Builder storagesPerDatanode(int numStorages) {
|
||||||
|
this.storagesPerDatanode = numStorages;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the same storage type configuration for each datanode.
|
* Set the same storage type configuration for each datanode.
|
||||||
* If storageTypes is uninitialized or passed null then
|
* If storageTypes is uninitialized or passed null then
|
||||||
* StorageType.DEFAULT is used.
|
* StorageType.DEFAULT is used.
|
||||||
*/
|
*/
|
||||||
public Builder storageTypes(StorageType[] types) {
|
public Builder storageTypes(StorageType[] types) {
|
||||||
assert types.length == DIRS_PER_DATANODE;
|
this.storageTypes1D = types;
|
||||||
this.storageTypes = new StorageType[numDataNodes][types.length];
|
|
||||||
for (int i = 0; i < numDataNodes; ++i) {
|
|
||||||
this.storageTypes[i] = types;
|
|
||||||
}
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,6 +230,26 @@ public class MiniDFSCluster {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the same storage capacity configuration for each datanode.
|
||||||
|
* If storageTypes is uninitialized or passed null then
|
||||||
|
* StorageType.DEFAULT is used.
|
||||||
|
*/
|
||||||
|
public Builder storageCapacities(long[] capacities) {
|
||||||
|
this.storageCapacities1D = capacities;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set custom storage capacity configuration for each datanode.
|
||||||
|
* If storageCapacities is uninitialized or passed null then
|
||||||
|
* capacity is limited by available disk space.
|
||||||
|
*/
|
||||||
|
public Builder storageCapacities(long[][] capacities) {
|
||||||
|
this.storageCapacities = capacities;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default: true
|
* Default: true
|
||||||
*/
|
*/
|
||||||
|
@ -290,6 +323,11 @@ public class MiniDFSCluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Use SimulatedFSDataset and limit the capacity of each DN per
|
||||||
|
* the values passed in val.
|
||||||
|
*
|
||||||
|
* For limiting the capacity of volumes with real storage, see
|
||||||
|
* {@link FsVolumeImpl#setCapacityForTesting}
|
||||||
* Default: null
|
* Default: null
|
||||||
*/
|
*/
|
||||||
public Builder simulatedCapacities(long[] val) {
|
public Builder simulatedCapacities(long[] val) {
|
||||||
|
@ -392,6 +430,27 @@ public class MiniDFSCluster {
|
||||||
LOG.info("starting cluster: numNameNodes=" + numNameNodes
|
LOG.info("starting cluster: numNameNodes=" + numNameNodes
|
||||||
+ ", numDataNodes=" + builder.numDataNodes);
|
+ ", numDataNodes=" + builder.numDataNodes);
|
||||||
nameNodes = new NameNodeInfo[numNameNodes];
|
nameNodes = new NameNodeInfo[numNameNodes];
|
||||||
|
this.storagesPerDatanode = builder.storagesPerDatanode;
|
||||||
|
|
||||||
|
// Duplicate the storageType setting for each DN.
|
||||||
|
if (builder.storageTypes == null && builder.storageTypes1D != null) {
|
||||||
|
assert builder.storageTypes1D.length == storagesPerDatanode;
|
||||||
|
builder.storageTypes = new StorageType[builder.numDataNodes][storagesPerDatanode];
|
||||||
|
|
||||||
|
for (int i = 0; i < builder.numDataNodes; ++i) {
|
||||||
|
builder.storageTypes[i] = builder.storageTypes1D;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Duplicate the storageCapacity setting for each DN.
|
||||||
|
if (builder.storageCapacities == null && builder.storageCapacities1D != null) {
|
||||||
|
assert builder.storageCapacities1D.length == storagesPerDatanode;
|
||||||
|
builder.storageCapacities = new long[builder.numDataNodes][storagesPerDatanode];
|
||||||
|
|
||||||
|
for (int i = 0; i < builder.numDataNodes; ++i) {
|
||||||
|
builder.storageCapacities[i] = builder.storageCapacities1D;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
initMiniDFSCluster(builder.conf,
|
initMiniDFSCluster(builder.conf,
|
||||||
builder.numDataNodes,
|
builder.numDataNodes,
|
||||||
|
@ -405,6 +464,7 @@ public class MiniDFSCluster {
|
||||||
builder.dnOption,
|
builder.dnOption,
|
||||||
builder.racks,
|
builder.racks,
|
||||||
builder.hosts,
|
builder.hosts,
|
||||||
|
builder.storageCapacities,
|
||||||
builder.simulatedCapacities,
|
builder.simulatedCapacities,
|
||||||
builder.clusterId,
|
builder.clusterId,
|
||||||
builder.waitSafeMode,
|
builder.waitSafeMode,
|
||||||
|
@ -447,6 +507,7 @@ public class MiniDFSCluster {
|
||||||
private boolean waitSafeMode = true;
|
private boolean waitSafeMode = true;
|
||||||
private boolean federation;
|
private boolean federation;
|
||||||
private boolean checkExitOnShutdown = true;
|
private boolean checkExitOnShutdown = true;
|
||||||
|
protected final int storagesPerDatanode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A unique instance identifier for the cluster. This
|
* A unique instance identifier for the cluster. This
|
||||||
|
@ -485,6 +546,7 @@ public class MiniDFSCluster {
|
||||||
*/
|
*/
|
||||||
public MiniDFSCluster() {
|
public MiniDFSCluster() {
|
||||||
nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
|
nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
|
||||||
|
storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
||||||
synchronized (MiniDFSCluster.class) {
|
synchronized (MiniDFSCluster.class) {
|
||||||
instanceId = instanceCount++;
|
instanceId = instanceCount++;
|
||||||
}
|
}
|
||||||
|
@ -659,10 +721,11 @@ public class MiniDFSCluster {
|
||||||
String[] racks, String hosts[],
|
String[] racks, String hosts[],
|
||||||
long[] simulatedCapacities) throws IOException {
|
long[] simulatedCapacities) throws IOException {
|
||||||
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
|
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
|
||||||
|
this.storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
|
||||||
initMiniDFSCluster(conf, numDataNodes, null, format,
|
initMiniDFSCluster(conf, numDataNodes, null, format,
|
||||||
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
|
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
|
||||||
operation, null, racks, hosts,
|
operation, null, racks, hosts,
|
||||||
simulatedCapacities, null, true, false,
|
null, simulatedCapacities, null, true, false,
|
||||||
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
|
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -672,7 +735,8 @@ public class MiniDFSCluster {
|
||||||
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
|
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
|
||||||
boolean manageDataDfsDirs, StartupOption startOpt,
|
boolean manageDataDfsDirs, StartupOption startOpt,
|
||||||
StartupOption dnStartOpt, String[] racks,
|
StartupOption dnStartOpt, String[] racks,
|
||||||
String[] hosts, long[] simulatedCapacities, String clusterId,
|
String[] hosts,
|
||||||
|
long[][] storageCapacities, long[] simulatedCapacities, String clusterId,
|
||||||
boolean waitSafeMode, boolean setupHostsFile,
|
boolean waitSafeMode, boolean setupHostsFile,
|
||||||
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
|
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
|
||||||
boolean checkDataNodeAddrConfig,
|
boolean checkDataNodeAddrConfig,
|
||||||
|
@ -746,7 +810,7 @@ public class MiniDFSCluster {
|
||||||
// Start the DataNodes
|
// Start the DataNodes
|
||||||
startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs,
|
startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs,
|
||||||
dnStartOpt != null ? dnStartOpt : startOpt,
|
dnStartOpt != null ? dnStartOpt : startOpt,
|
||||||
racks, hosts, simulatedCapacities, setupHostsFile,
|
racks, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
|
||||||
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
|
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
|
||||||
waitClusterUp();
|
waitClusterUp();
|
||||||
//make sure ProxyUsers uses the latest conf
|
//make sure ProxyUsers uses the latest conf
|
||||||
|
@ -1121,8 +1185,8 @@ public class MiniDFSCluster {
|
||||||
|
|
||||||
String makeDataNodeDirs(int dnIndex, StorageType[] storageTypes) throws IOException {
|
String makeDataNodeDirs(int dnIndex, StorageType[] storageTypes) throws IOException {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
assert storageTypes == null || storageTypes.length == DIRS_PER_DATANODE;
|
assert storageTypes == null || storageTypes.length == storagesPerDatanode;
|
||||||
for (int j = 0; j < DIRS_PER_DATANODE; ++j) {
|
for (int j = 0; j < storagesPerDatanode; ++j) {
|
||||||
File dir = getInstanceStorageDir(dnIndex, j);
|
File dir = getInstanceStorageDir(dnIndex, j);
|
||||||
dir.mkdirs();
|
dir.mkdirs();
|
||||||
if (!dir.isDirectory()) {
|
if (!dir.isDirectory()) {
|
||||||
|
@ -1198,7 +1262,7 @@ public class MiniDFSCluster {
|
||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile) throws IOException {
|
boolean setupHostsFile) throws IOException {
|
||||||
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
||||||
simulatedCapacities, setupHostsFile, false, false, null);
|
null, simulatedCapacities, setupHostsFile, false, false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||||
|
@ -1208,7 +1272,7 @@ public class MiniDFSCluster {
|
||||||
boolean setupHostsFile,
|
boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig) throws IOException {
|
boolean checkDataNodeAddrConfig) throws IOException {
|
||||||
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
|
||||||
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
|
null, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1242,12 +1306,15 @@ public class MiniDFSCluster {
|
||||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||||
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
||||||
String[] racks, String[] hosts,
|
String[] racks, String[] hosts,
|
||||||
|
long[][] storageCapacities,
|
||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile,
|
boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig,
|
boolean checkDataNodeAddrConfig,
|
||||||
boolean checkDataNodeHostConfig,
|
boolean checkDataNodeHostConfig,
|
||||||
Configuration[] dnConfOverlays) throws IOException {
|
Configuration[] dnConfOverlays) throws IOException {
|
||||||
|
assert storageCapacities == null || simulatedCapacities == null;
|
||||||
assert storageTypes == null || storageTypes.length == numDataNodes;
|
assert storageTypes == null || storageTypes.length == numDataNodes;
|
||||||
|
assert storageCapacities == null || storageCapacities.length == numDataNodes;
|
||||||
|
|
||||||
if (operation == StartupOption.RECOVER) {
|
if (operation == StartupOption.RECOVER) {
|
||||||
return;
|
return;
|
||||||
|
@ -1300,7 +1367,7 @@ public class MiniDFSCluster {
|
||||||
operation != StartupOption.ROLLBACK) ?
|
operation != StartupOption.ROLLBACK) ?
|
||||||
null : new String[] {operation.getName()};
|
null : new String[] {operation.getName()};
|
||||||
|
|
||||||
|
DataNode[] dns = new DataNode[numDataNodes];
|
||||||
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
|
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
|
||||||
Configuration dnConf = new HdfsConfiguration(conf);
|
Configuration dnConf = new HdfsConfiguration(conf);
|
||||||
if (dnConfOverlays != null) {
|
if (dnConfOverlays != null) {
|
||||||
|
@ -1391,10 +1458,24 @@ public class MiniDFSCluster {
|
||||||
dn.runDatanodeDaemon();
|
dn.runDatanodeDaemon();
|
||||||
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs,
|
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs,
|
||||||
secureResources, dn.getIpcPort()));
|
secureResources, dn.getIpcPort()));
|
||||||
|
dns[i - curDatanodesNum] = dn;
|
||||||
}
|
}
|
||||||
curDatanodesNum += numDataNodes;
|
curDatanodesNum += numDataNodes;
|
||||||
this.numDataNodes += numDataNodes;
|
this.numDataNodes += numDataNodes;
|
||||||
waitActive();
|
waitActive();
|
||||||
|
|
||||||
|
if (storageCapacities != null) {
|
||||||
|
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
|
||||||
|
List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes();
|
||||||
|
assert storageCapacities[i].length == storagesPerDatanode;
|
||||||
|
assert volumes.size() == storagesPerDatanode;
|
||||||
|
|
||||||
|
for (int j = 0; j < volumes.size(); ++j) {
|
||||||
|
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
|
||||||
|
volume.setCapacityForTesting(storageCapacities[i][j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -31,6 +32,8 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.StaticMapping;
|
import org.apache.hadoop.net.StaticMapping;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -52,11 +55,15 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
|
||||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||||
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
||||||
String[] racks, String[] nodeGroups, String[] hosts,
|
String[] racks, String[] nodeGroups, String[] hosts,
|
||||||
|
long[][] storageCapacities,
|
||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile,
|
boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig,
|
boolean checkDataNodeAddrConfig,
|
||||||
boolean checkDataNodeHostConfig) throws IOException {
|
boolean checkDataNodeHostConfig) throws IOException {
|
||||||
|
|
||||||
|
assert storageCapacities == null || simulatedCapacities == null;
|
||||||
assert storageTypes == null || storageTypes.length == numDataNodes;
|
assert storageTypes == null || storageTypes.length == numDataNodes;
|
||||||
|
assert storageCapacities == null || storageCapacities.length == numDataNodes;
|
||||||
|
|
||||||
if (operation == StartupOption.RECOVER) {
|
if (operation == StartupOption.RECOVER) {
|
||||||
return;
|
return;
|
||||||
|
@ -109,6 +116,7 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
|
||||||
operation != StartupOption.ROLLBACK) ?
|
operation != StartupOption.ROLLBACK) ?
|
||||||
null : new String[] {operation.getName()};
|
null : new String[] {operation.getName()};
|
||||||
|
|
||||||
|
DataNode[] dns = new DataNode[numDataNodes];
|
||||||
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
|
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
|
||||||
Configuration dnConf = new HdfsConfiguration(conf);
|
Configuration dnConf = new HdfsConfiguration(conf);
|
||||||
// Set up datanode address
|
// Set up datanode address
|
||||||
|
@ -181,10 +189,23 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
|
||||||
}
|
}
|
||||||
dn.runDatanodeDaemon();
|
dn.runDatanodeDaemon();
|
||||||
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources, dn.getIpcPort()));
|
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources, dn.getIpcPort()));
|
||||||
|
dns[i - curDatanodesNum] = dn;
|
||||||
}
|
}
|
||||||
curDatanodesNum += numDataNodes;
|
curDatanodesNum += numDataNodes;
|
||||||
this.numDataNodes += numDataNodes;
|
this.numDataNodes += numDataNodes;
|
||||||
waitActive();
|
waitActive();
|
||||||
|
|
||||||
|
if (storageCapacities != null) {
|
||||||
|
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
|
||||||
|
List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes();
|
||||||
|
assert volumes.size() == storagesPerDatanode;
|
||||||
|
|
||||||
|
for (int j = 0; j < volumes.size(); ++j) {
|
||||||
|
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
|
||||||
|
volume.setCapacityForTesting(storageCapacities[i][j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||||
|
@ -193,7 +214,7 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
|
||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile) throws IOException {
|
boolean setupHostsFile) throws IOException {
|
||||||
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, nodeGroups,
|
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, nodeGroups,
|
||||||
hosts, simulatedCapacities, setupHostsFile, false, false);
|
hosts, null, simulatedCapacities, setupHostsFile, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void startDataNodes(Configuration conf, int numDataNodes,
|
public void startDataNodes(Configuration conf, int numDataNodes,
|
||||||
|
@ -209,13 +230,14 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
|
||||||
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
||||||
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
|
||||||
String[] racks, String[] hosts,
|
String[] racks, String[] hosts,
|
||||||
|
long[][] storageCapacities,
|
||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile,
|
boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig,
|
boolean checkDataNodeAddrConfig,
|
||||||
boolean checkDataNodeHostConfig,
|
boolean checkDataNodeHostConfig,
|
||||||
Configuration[] dnConfOverlays) throws IOException {
|
Configuration[] dnConfOverlays) throws IOException {
|
||||||
startDataNodes(conf, numDataNodes, storageTypes, manageDfsDirs, operation, racks,
|
startDataNodes(conf, numDataNodes, storageTypes, manageDfsDirs, operation, racks,
|
||||||
NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile,
|
NODE_GROUPS, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
|
||||||
checkDataNodeAddrConfig, checkDataNodeHostConfig);
|
checkDataNodeAddrConfig, checkDataNodeHostConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -213,7 +213,7 @@ public class TestSafeMode {
|
||||||
@Override
|
@Override
|
||||||
public Boolean get() {
|
public Boolean get() {
|
||||||
return getLongCounter("StorageBlockReportOps", getMetrics(NN_METRICS)) ==
|
return getLongCounter("StorageBlockReportOps", getMetrics(NN_METRICS)) ==
|
||||||
MiniDFSCluster.DIRS_PER_DATANODE;
|
cluster.getStoragesPerDatanode();
|
||||||
}
|
}
|
||||||
}, 10, 10000);
|
}, 10, 10000);
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
|
||||||
|
@ -106,7 +105,7 @@ public class TestBlockHasMultipleReplicasOnSameDN {
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
DatanodeRegistration dnReg = dn.getDNRegistrationForBP(bpid);
|
DatanodeRegistration dnReg = dn.getDNRegistrationForBP(bpid);
|
||||||
StorageBlockReport reports[] =
|
StorageBlockReport reports[] =
|
||||||
new StorageBlockReport[MiniDFSCluster.DIRS_PER_DATANODE];
|
new StorageBlockReport[cluster.getStoragesPerDatanode()];
|
||||||
|
|
||||||
ArrayList<Block> blocks = new ArrayList<Block>();
|
ArrayList<Block> blocks = new ArrayList<Block>();
|
||||||
|
|
||||||
|
@ -114,7 +113,7 @@ public class TestBlockHasMultipleReplicasOnSameDN {
|
||||||
blocks.add(locatedBlock.getBlock().getLocalBlock());
|
blocks.add(locatedBlock.getBlock().getLocalBlock());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < MiniDFSCluster.DIRS_PER_DATANODE; ++i) {
|
for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
|
||||||
BlockListAsLongs bll = new BlockListAsLongs(blocks, null);
|
BlockListAsLongs bll = new BlockListAsLongs(blocks, null);
|
||||||
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
|
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
|
||||||
DatanodeStorage dns = new DatanodeStorage(v.getStorageID());
|
DatanodeStorage dns = new DatanodeStorage(v.getStorageID());
|
||||||
|
|
|
@ -130,7 +130,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
ArgumentCaptor<StorageBlockReport[]> captor =
|
ArgumentCaptor<StorageBlockReport[]> captor =
|
||||||
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
||||||
|
|
||||||
Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport(
|
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
|
||||||
any(DatanodeRegistration.class),
|
any(DatanodeRegistration.class),
|
||||||
anyString(),
|
anyString(),
|
||||||
captor.capture());
|
captor.capture());
|
||||||
|
@ -167,7 +167,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
anyString(),
|
anyString(),
|
||||||
captor.capture());
|
captor.capture());
|
||||||
|
|
||||||
verifyCapturedArguments(captor, MiniDFSCluster.DIRS_PER_DATANODE, BLOCKS_IN_FILE);
|
verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -194,7 +194,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
ArgumentCaptor<StorageBlockReport[]> captor =
|
ArgumentCaptor<StorageBlockReport[]> captor =
|
||||||
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
||||||
|
|
||||||
Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport(
|
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
|
||||||
any(DatanodeRegistration.class),
|
any(DatanodeRegistration.class),
|
||||||
anyString(),
|
anyString(),
|
||||||
captor.capture());
|
captor.capture());
|
||||||
|
|
|
@ -444,7 +444,7 @@ public class TestNameNodeMetrics {
|
||||||
assertCounter("SyncsNumOps", 1L, rb);
|
assertCounter("SyncsNumOps", 1L, rb);
|
||||||
// Each datanode reports in when the cluster comes up
|
// Each datanode reports in when the cluster comes up
|
||||||
assertCounter("BlockReportNumOps",
|
assertCounter("BlockReportNumOps",
|
||||||
(long)DATANODE_COUNT*MiniDFSCluster.DIRS_PER_DATANODE, rb);
|
(long)DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb);
|
||||||
|
|
||||||
// Sleep for an interval+slop to let the percentiles rollover
|
// Sleep for an interval+slop to let the percentiles rollover
|
||||||
Thread.sleep((PERCENTILES_INTERVAL+1)*1000);
|
Thread.sleep((PERCENTILES_INTERVAL+1)*1000);
|
||||||
|
|
Loading…
Reference in New Issue