HDFS-15716. WaitforReplication in TestUpgradeDomainBlockPlacementPolicy (#2528)
This commit is contained in:
parent
4ffec79b7c
commit
01383a2172
|
@ -17,16 +17,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
@ -41,16 +38,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpg
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
||||||
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
||||||
import org.apache.hadoop.net.StaticMapping;
|
import org.apache.hadoop.net.StaticMapping;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* End-to-end test case for upgrade domain
|
* End-to-end test case for upgrade domain
|
||||||
* The test configs upgrade domain for nodes via admin json
|
* The test configs upgrade domain for nodes via admin json
|
||||||
|
@ -63,6 +58,8 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
|
|
||||||
private static final short REPLICATION_FACTOR = (short) 3;
|
private static final short REPLICATION_FACTOR = (short) 3;
|
||||||
private static final int DEFAULT_BLOCK_SIZE = 1024;
|
private static final int DEFAULT_BLOCK_SIZE = 1024;
|
||||||
|
private static final int WAIT_TIMEOUT_MS = 60000;
|
||||||
|
private static final long FILE_SIZE = DEFAULT_BLOCK_SIZE * 5;
|
||||||
static final String[] racks =
|
static final String[] racks =
|
||||||
{ "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" };
|
{ "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" };
|
||||||
static final String[] hosts =
|
static final String[] hosts =
|
||||||
|
@ -71,9 +68,6 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
{"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"};
|
{"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"};
|
||||||
static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
|
static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
|
||||||
private MiniDFSCluster cluster = null;
|
private MiniDFSCluster cluster = null;
|
||||||
private NamenodeProtocols nameNodeRpc = null;
|
|
||||||
private FSNamesystem namesystem = null;
|
|
||||||
private PermissionStatus perm = null;
|
|
||||||
private HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
private HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -92,10 +86,6 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
|
||||||
.hosts(hosts).build();
|
.hosts(hosts).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
nameNodeRpc = cluster.getNameNodeRpc();
|
|
||||||
namesystem = cluster.getNamesystem();
|
|
||||||
perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null,
|
|
||||||
FsPermission.getDefault());
|
|
||||||
refreshDatanodeAdminProperties();
|
refreshDatanodeAdminProperties();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,43 +176,51 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId());
|
expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createFileAndWaitForReplication(final Path path,
|
||||||
|
final long fileLen)
|
||||||
|
throws Exception {
|
||||||
|
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileLen,
|
||||||
|
REPLICATION_FACTOR, 1000L);
|
||||||
|
DFSTestUtil.waitForReplication(cluster.getFileSystem(), path,
|
||||||
|
REPLICATION_FACTOR, WAIT_TIMEOUT_MS);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPlacement() throws Exception {
|
public void testPlacement() throws Exception {
|
||||||
final long fileSize = DEFAULT_BLOCK_SIZE * 5;
|
final long fileSize = FILE_SIZE;
|
||||||
final String testFile = new String("/testfile");
|
final String testFile = "/testfile";
|
||||||
final Path path = new Path(testFile);
|
final Path path = new Path(testFile);
|
||||||
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize,
|
createFileAndWaitForReplication(path, FILE_SIZE);
|
||||||
REPLICATION_FACTOR, 1000L);
|
|
||||||
LocatedBlocks locatedBlocks =
|
LocatedBlocks locatedBlocks =
|
||||||
cluster.getFileSystem().getClient().getLocatedBlocks(
|
cluster.getFileSystem().getClient().getLocatedBlocks(
|
||||||
path.toString(), 0, fileSize);
|
path.toString(), 0, fileSize);
|
||||||
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
||||||
Set<DatanodeInfo> locs = new HashSet<>();
|
Set<DatanodeInfo> locs = new HashSet<>();
|
||||||
for(DatanodeInfo datanodeInfo : block.getLocations()) {
|
for(DatanodeInfo datanodeInfo : block.getLocations()) {
|
||||||
if (datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.NORMAL) {
|
if (datanodeInfo.getAdminState()
|
||||||
|
.equals(DatanodeInfo.AdminStates.NORMAL)) {
|
||||||
locs.add(datanodeInfo);
|
locs.add(datanodeInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (DatanodeID datanodeID : expectedDatanodeIDs) {
|
for (DatanodeID datanodeID : expectedDatanodeIDs) {
|
||||||
assertTrue(locs.contains(datanodeID));
|
Assert.assertTrue(locs.contains(datanodeID));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testPlacementAfterDecommission() throws Exception {
|
public void testPlacementAfterDecommission() throws Exception {
|
||||||
final long fileSize = DEFAULT_BLOCK_SIZE * 5;
|
final long fileSize = FILE_SIZE;
|
||||||
final String testFile = new String("/testfile");
|
final String testFile = "/testfile-afterdecomm";
|
||||||
final Path path = new Path(testFile);
|
final Path path = new Path(testFile);
|
||||||
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize,
|
createFileAndWaitForReplication(path, fileSize);
|
||||||
REPLICATION_FACTOR, 1000L);
|
|
||||||
|
|
||||||
// Decommission some nodes and wait until decommissions have finished.
|
// Decommission some nodes and wait until decommissions have finished.
|
||||||
refreshDatanodeAdminProperties2();
|
refreshDatanodeAdminProperties2();
|
||||||
|
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
public Boolean get() {
|
public Boolean get() {
|
||||||
boolean successful = true;
|
|
||||||
LocatedBlocks locatedBlocks;
|
LocatedBlocks locatedBlocks;
|
||||||
try {
|
try {
|
||||||
locatedBlocks =
|
locatedBlocks =
|
||||||
|
@ -231,32 +229,34 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
||||||
Set<DatanodeInfo> locs = new HashSet<>();
|
Set<DatanodeInfo> locs = new HashSet<>();
|
||||||
for (DatanodeInfo datanodeInfo : block.getLocations()) {
|
for (DatanodeInfo datanodeInfo : block.getLocations()) {
|
||||||
if (datanodeInfo.getAdminState() ==
|
if (datanodeInfo.getAdminState().equals(
|
||||||
DatanodeInfo.AdminStates.NORMAL) {
|
DatanodeInfo.AdminStates.NORMAL)) {
|
||||||
locs.add(datanodeInfo);
|
locs.add(datanodeInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (DatanodeID datanodeID : expectedDatanodeIDs) {
|
for (DatanodeID datanodeID : expectedDatanodeIDs) {
|
||||||
successful = successful && locs.contains(datanodeID);
|
if (!locs.contains(datanodeID)) {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return successful;
|
|
||||||
}
|
}
|
||||||
}, 1000, 60000);
|
return true;
|
||||||
|
}
|
||||||
|
}, 1000, WAIT_TIMEOUT_MS);
|
||||||
|
|
||||||
// Verify block placement policy of each block.
|
// Verify block placement policy of each block.
|
||||||
LocatedBlocks locatedBlocks;
|
LocatedBlocks locatedBlocks =
|
||||||
locatedBlocks =
|
|
||||||
cluster.getFileSystem().getClient().getLocatedBlocks(
|
cluster.getFileSystem().getClient().getLocatedBlocks(
|
||||||
path.toString(), 0, fileSize);
|
path.toString(), 0, fileSize);
|
||||||
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
||||||
BlockPlacementStatus status = cluster.getNamesystem().getBlockManager().
|
BlockPlacementStatus status =
|
||||||
getBlockPlacementPolicy().verifyBlockPlacement(
|
cluster.getNamesystem().getBlockManager()
|
||||||
block.getLocations(), REPLICATION_FACTOR);
|
.getBlockPlacementPolicy()
|
||||||
assertTrue(status.isPlacementPolicySatisfied());
|
.verifyBlockPlacement(block.getLocations(), REPLICATION_FACTOR);
|
||||||
|
Assert.assertTrue(status.isPlacementPolicySatisfied());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue