HDFS-10824. MiniDFSCluster#storageCapacities has no effects on real capacity. Contributed by Xiaobing Zhou.
This commit is contained in:
parent
bc91e33d5e
commit
50888471d9
|
@ -57,6 +57,7 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -547,6 +548,8 @@ public class MiniDFSCluster implements AutoCloseable {
|
||||||
protected final int storagesPerDatanode;
|
protected final int storagesPerDatanode;
|
||||||
private Set<FileSystem> fileSystems = Sets.newHashSet();
|
private Set<FileSystem> fileSystems = Sets.newHashSet();
|
||||||
|
|
||||||
|
private List<long[]> storageCap = Lists.newLinkedList();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A unique instance identifier for the cluster. This
|
* A unique instance identifier for the cluster. This
|
||||||
* is used to disambiguate HA filesystems in the case where
|
* is used to disambiguate HA filesystems in the case where
|
||||||
|
@ -1546,25 +1549,16 @@ public class MiniDFSCluster implements AutoCloseable {
|
||||||
this.numDataNodes += numDataNodes;
|
this.numDataNodes += numDataNodes;
|
||||||
waitActive();
|
waitActive();
|
||||||
|
|
||||||
if (storageCapacities != null) {
|
|
||||||
for (int i = curDatanodesNumSaved; i < curDatanodesNumSaved+numDataNodes; ++i) {
|
|
||||||
final int index = i - curDatanodesNum;
|
|
||||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
|
||||||
dns[index].getFSDataset().getFsVolumeReferences()) {
|
|
||||||
assert storageCapacities[index].length == storagesPerDatanode;
|
|
||||||
assert volumes.size() == storagesPerDatanode;
|
|
||||||
|
|
||||||
int j = 0;
|
setDataNodeStorageCapacities(
|
||||||
for (FsVolumeSpi fvs : volumes) {
|
curDatanodesNum,
|
||||||
FsVolumeImpl volume = (FsVolumeImpl) fvs;
|
numDataNodes,
|
||||||
LOG.info("setCapacityForTesting " + storageCapacities[index][j]
|
dns,
|
||||||
+ " for [" + volume.getStorageType() + "]" + volume
|
storageCapacities);
|
||||||
.getStorageID());
|
|
||||||
volume.setCapacityForTesting(storageCapacities[index][j]);
|
/* memorize storage capacities */
|
||||||
j++;
|
if (storageCapacities != null) {
|
||||||
}
|
storageCap.addAll(Arrays.asList(storageCapacities));
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2126,6 +2120,16 @@ public class MiniDFSCluster implements AutoCloseable {
|
||||||
return restartDataNode(dnprop, false);
|
return restartDataNode(dnprop, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void waitDataNodeFullyStarted(final DataNode dn)
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return dn.isDatanodeFullyStarted();
|
||||||
|
}
|
||||||
|
}, 100, 60000);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart a datanode, on the same port if requested
|
* Restart a datanode, on the same port if requested
|
||||||
* @param dnprop the datanode to restart
|
* @param dnprop the datanode to restart
|
||||||
|
@ -2146,13 +2150,69 @@ public class MiniDFSCluster implements AutoCloseable {
|
||||||
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY,
|
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY,
|
||||||
addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort);
|
addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort);
|
||||||
}
|
}
|
||||||
DataNode newDn = DataNode.createDataNode(args, conf, secureResources);
|
final DataNode newDn = DataNode.createDataNode(args, conf, secureResources);
|
||||||
dataNodes.add(new DataNodeProperties(
|
|
||||||
newDn, newconf, args, secureResources, newDn.getIpcPort()));
|
final DataNodeProperties dnp = new DataNodeProperties(
|
||||||
|
newDn,
|
||||||
|
newconf,
|
||||||
|
args,
|
||||||
|
secureResources,
|
||||||
|
newDn.getIpcPort());
|
||||||
|
dataNodes.add(dnp);
|
||||||
numDataNodes++;
|
numDataNodes++;
|
||||||
|
|
||||||
|
setDataNodeStorageCapacities(
|
||||||
|
dataNodes.lastIndexOf(dnp),
|
||||||
|
newDn,
|
||||||
|
storageCap.toArray(new long[][]{}));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized void setDataNodeStorageCapacities(
|
||||||
|
final int curDatanodesNum,
|
||||||
|
final int numDNs,
|
||||||
|
final DataNode[] dns,
|
||||||
|
long[][] storageCapacities) throws IOException {
|
||||||
|
if (storageCapacities != null) {
|
||||||
|
for (int i = curDatanodesNum; i < curDatanodesNum + numDNs; ++i) {
|
||||||
|
final int index = i - curDatanodesNum;
|
||||||
|
setDataNodeStorageCapacities(index, dns[index], storageCapacities);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void setDataNodeStorageCapacities(
|
||||||
|
final int curDnIdx,
|
||||||
|
final DataNode curDn,
|
||||||
|
long[][] storageCapacities) throws IOException {
|
||||||
|
|
||||||
|
if (storageCapacities == null || storageCapacities.length == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
waitDataNodeFullyStarted(curDn);
|
||||||
|
} catch (TimeoutException | InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (FsDatasetSpi.FsVolumeReferences volumes = curDn.getFSDataset()
|
||||||
|
.getFsVolumeReferences()) {
|
||||||
|
assert storageCapacities[curDnIdx].length == storagesPerDatanode;
|
||||||
|
assert volumes.size() == storagesPerDatanode;
|
||||||
|
|
||||||
|
int j = 0;
|
||||||
|
for (FsVolumeSpi fvs : volumes) {
|
||||||
|
FsVolumeImpl volume = (FsVolumeImpl) fvs;
|
||||||
|
LOG.info("setCapacityForTesting " + storageCapacities[curDnIdx][j]
|
||||||
|
+ " for [" + volume.getStorageType() + "]" + volume.getStorageID());
|
||||||
|
volume.setCapacityForTesting(storageCapacities[curDnIdx][j]);
|
||||||
|
j++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DataNodeTestUtils.triggerHeartbeat(curDn);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Restart a particular datanode, use newly assigned port
|
* Restart a particular datanode, use newly assigned port
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -25,16 +25,25 @@ import static org.junit.Assume.assumeTrue;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
|
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.test.PathUtils;
|
import org.apache.hadoop.test.PathUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests MiniDFS cluster setup/teardown and isolation.
|
* Tests MiniDFS cluster setup/teardown and isolation.
|
||||||
* Every instance is brought up with a new data dir, to ensure that
|
* Every instance is brought up with a new data dir, to ensure that
|
||||||
|
@ -78,6 +87,115 @@ public class TestMiniDFSCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests storage capacity setting still effective after cluster restart.
|
||||||
|
*/
|
||||||
|
@Test(timeout=100000)
|
||||||
|
public void testClusterSetStorageCapacity() throws Throwable {
|
||||||
|
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
final int numDatanodes = 1;
|
||||||
|
final int defaultBlockSize = 1024;
|
||||||
|
final int blocks = 100;
|
||||||
|
final int blocksSize = 1024;
|
||||||
|
final int fileLen = blocks * blocksSize;
|
||||||
|
final long capcacity = defaultBlockSize * 2 * fileLen;
|
||||||
|
final long[] capacities = new long[] {capcacity, 2 * capcacity};
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster = newCluster(
|
||||||
|
conf,
|
||||||
|
numDatanodes,
|
||||||
|
capacities,
|
||||||
|
defaultBlockSize,
|
||||||
|
fileLen);
|
||||||
|
verifyStorageCapacity(cluster, capacities);
|
||||||
|
|
||||||
|
/* restart all data nodes */
|
||||||
|
cluster.restartDataNodes();
|
||||||
|
cluster.waitActive();
|
||||||
|
verifyStorageCapacity(cluster, capacities);
|
||||||
|
|
||||||
|
/* restart all name nodes */
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
cluster.waitActive();
|
||||||
|
verifyStorageCapacity(cluster, capacities);
|
||||||
|
|
||||||
|
/* restart all name nodes firstly and data nodes then */
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
cluster.restartDataNodes();
|
||||||
|
cluster.waitActive();
|
||||||
|
verifyStorageCapacity(cluster, capacities);
|
||||||
|
|
||||||
|
/* restart all data nodes firstly and name nodes then */
|
||||||
|
cluster.restartDataNodes();
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
cluster.waitActive();
|
||||||
|
verifyStorageCapacity(cluster, capacities);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyStorageCapacity(
|
||||||
|
final MiniDFSCluster cluster,
|
||||||
|
final long[] capacities) throws IOException {
|
||||||
|
|
||||||
|
FsVolumeImpl source = null;
|
||||||
|
FsVolumeImpl dest = null;
|
||||||
|
|
||||||
|
/* verify capacity */
|
||||||
|
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
|
||||||
|
final DataNode dnNode = cluster.getDataNodes().get(i);
|
||||||
|
try (FsDatasetSpi.FsVolumeReferences refs = dnNode.getFSDataset()
|
||||||
|
.getFsVolumeReferences()) {
|
||||||
|
source = (FsVolumeImpl) refs.get(0);
|
||||||
|
dest = (FsVolumeImpl) refs.get(1);
|
||||||
|
assertEquals(capacities[0], source.getCapacity());
|
||||||
|
assertEquals(capacities[1], dest.getCapacity());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MiniDFSCluster newCluster(
|
||||||
|
final Configuration conf,
|
||||||
|
final int numDatanodes,
|
||||||
|
final long[] storageCapacities,
|
||||||
|
final int defaultBlockSize,
|
||||||
|
final int fileLen)
|
||||||
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||||
|
|
||||||
|
final String fileName = "/" + UUID.randomUUID().toString();
|
||||||
|
final Path filePath = new Path(fileName);
|
||||||
|
|
||||||
|
Preconditions.checkNotNull(storageCapacities);
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
storageCapacities.length == 2,
|
||||||
|
"need to specify capacities for two storages.");
|
||||||
|
|
||||||
|
/* Write a file and restart the cluster */
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(numDatanodes)
|
||||||
|
.storageCapacities(storageCapacities)
|
||||||
|
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
|
||||||
|
.storagesPerDatanode(2)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
final short replicationFactor = (short) 1;
|
||||||
|
final Random r = new Random();
|
||||||
|
FileSystem fs = cluster.getFileSystem(0);
|
||||||
|
DFSTestUtil.createFile(
|
||||||
|
fs,
|
||||||
|
filePath,
|
||||||
|
fileLen,
|
||||||
|
replicationFactor,
|
||||||
|
r.nextLong());
|
||||||
|
DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
|
||||||
|
|
||||||
|
return cluster;
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=100000)
|
@Test(timeout=100000)
|
||||||
public void testIsClusterUpAfterShutdown() throws Throwable {
|
public void testIsClusterUpAfterShutdown() throws Throwable {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
|
Loading…
Reference in New Issue