HDFS-9922. Upgrade Domain placement policy status marks a good block in violation when there are decommissioned nodes. (Chris Trezzo via mingma)

This commit is contained in:
Ming Ma 2016-06-15 22:00:52 -07:00
parent 5dfc38ff57
commit b48f27e794
3 changed files with 210 additions and 38 deletions

View File

@ -60,7 +60,7 @@ public class BlockPlacementStatusWithUpgradeDomain implements
private boolean isUpgradeDomainPolicySatisfied() { private boolean isUpgradeDomainPolicySatisfied() {
if (numberOfReplicas <= upgradeDomainFactor) { if (numberOfReplicas <= upgradeDomainFactor) {
return (numberOfReplicas == upgradeDomains.size()); return (numberOfReplicas <= upgradeDomains.size());
} else { } else {
return upgradeDomains.size() >= upgradeDomainFactor; return upgradeDomains.size() >= upgradeDomainFactor;
} }

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
/**
* Unit tests for BlockPlacementStatusWithUpgradeDomain class.
*/
public class TestBlockPlacementStatusWithUpgradeDomain {
private Set<String> upgradeDomains;
private BlockPlacementStatusDefault bpsd =
mock(BlockPlacementStatusDefault.class);
@Before
public void setup() {
upgradeDomains = new HashSet<String>();
upgradeDomains.add("1");
upgradeDomains.add("2");
upgradeDomains.add("3");
when(bpsd.isPlacementPolicySatisfied()).thenReturn(true);
}
@Test
public void testIsPolicySatisfiedParentFalse() {
when(bpsd.isPlacementPolicySatisfied()).thenReturn(false);
BlockPlacementStatusWithUpgradeDomain bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
// Parent policy is not satisfied but upgrade domain policy is
assertFalse(bps.isPlacementPolicySatisfied());
}
@Test
public void testIsPolicySatisfiedAllEqual() {
BlockPlacementStatusWithUpgradeDomain bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
// Number of domains, replicas and upgradeDomainFactor is equal and parent
// policy is satisfied
assertTrue(bps.isPlacementPolicySatisfied());
}
@Test
public void testIsPolicySatisifedSmallDomains() {
// Number of domains is less than replicas but equal to factor
BlockPlacementStatusWithUpgradeDomain bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 3);
assertTrue(bps.isPlacementPolicySatisfied());
// Same as above but replicas is greater than factor
bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 2);
assertTrue(bps.isPlacementPolicySatisfied());
// Number of domains is less than replicas and factor
bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 4);
assertFalse(bps.isPlacementPolicySatisfied());
}
}

View File

@ -17,38 +17,40 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier;
/** /**
* End-to-end test case for upgrade domain * End-to-end test case for upgrade domain
* The test configs upgrade domain for nodes via admin json * The test configs upgrade domain for nodes via admin json
@ -69,15 +71,16 @@ public class TestUpgradeDomainBlockPlacementPolicy {
* CombinedHostFileManager won't allow those hosts. * CombinedHostFileManager won't allow those hosts.
*/ */
static final String[] hosts = static final String[] hosts =
{ "127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1", {"127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1",
"127.0.0.1", "127.0.0.1" }; "127.0.0.1", "127.0.0.1"};
static final String[] upgradeDomains = static final String[] upgradeDomains =
{ "ud1", "ud2", "ud3", "ud1", "ud2", "ud3" }; {"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"};
static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>(); static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
private MiniDFSCluster cluster = null; private MiniDFSCluster cluster = null;
private NamenodeProtocols nameNodeRpc = null; private NamenodeProtocols nameNodeRpc = null;
private FSNamesystem namesystem = null; private FSNamesystem namesystem = null;
private PermissionStatus perm = null; private PermissionStatus perm = null;
private HostsFileWriter hostsFileWriter = new HostsFileWriter();
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
@ -86,11 +89,10 @@ public class TestUpgradeDomainBlockPlacementPolicy {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyWithUpgradeDomain.class, BlockPlacementPolicyWithUpgradeDomain.class,
BlockPlacementPolicy.class); BlockPlacementPolicy.class);
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
CombinedHostFileManager.class, HostConfigManager.class); CombinedHostFileManager.class, HostConfigManager.class);
HostsFileWriter hostsFileWriter = new HostsFileWriter();
hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy"); hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
@ -100,12 +102,12 @@ public class TestUpgradeDomainBlockPlacementPolicy {
namesystem = cluster.getNamesystem(); namesystem = cluster.getNamesystem();
perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null, perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null,
FsPermission.getDefault()); FsPermission.getDefault());
refreshDatanodeAdminProperties(hostsFileWriter); refreshDatanodeAdminProperties();
hostsFileWriter.cleanup();
} }
@After @After
public void teardown() { public void teardown() throws IOException {
hostsFileWriter.cleanup();
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
cluster = null; cluster = null;
@ -114,15 +116,18 @@ public class TestUpgradeDomainBlockPlacementPolicy {
/** /**
* Define admin properties for these datanodes as follows. * Define admin properties for these datanodes as follows.
* dn0 and dn3 have upgrade domain ud1. * dn0's upgrade domain is ud5.
* dn1 and dn4 have upgrade domain ud2. * dn1's upgrade domain is ud2.
* dn2 and dn5 have upgrade domain ud3. * dn2's upgrade domain is ud3.
* dn3's upgrade domain is ud1.
* dn4's upgrade domain is ud2.
* dn5's upgrade domain is ud4.
* dn0 and dn5 are decommissioned. * dn0 and dn5 are decommissioned.
* Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on * Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
* rack2. Then any block's replicas should be on either * rack2. Then any block's replicas should be on either
* {dn1, dn2, d3} or {dn2, dn3, dn4}. * {dn1, dn2, d3} or {dn2, dn3, dn4}.
*/ */
private void refreshDatanodeAdminProperties(HostsFileWriter hostsFileWriter) private void refreshDatanodeAdminProperties()
throws IOException { throws IOException {
DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[ DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
hosts.length]; hosts.length];
@ -138,32 +143,116 @@ public class TestUpgradeDomainBlockPlacementPolicy {
hostsFileWriter.initIncludeHosts(datanodes); hostsFileWriter.initIncludeHosts(datanodes);
cluster.getFileSystem().refreshNodes(); cluster.getFileSystem().refreshNodes();
expectedDatanodeIDs.clear();
expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId()); expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId());
expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId()); expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId());
} }
/**
* Define admin properties for these datanodes as follows.
* dn0's upgrade domain is ud5.
* dn1's upgrade domain is ud2.
* dn2's upgrade domain is ud3.
* dn3's upgrade domain is ud1.
* dn4's upgrade domain is ud2.
* dn5's upgrade domain is ud4.
* dn2 and dn3 are decommissioned.
* Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
* rack2. Then any block's replicas should be on either
* {dn0, dn1, d5} or {dn0, dn4, dn5}.
*/
private void refreshDatanodeAdminProperties2()
throws IOException {
DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
hosts.length];
for (int i = 0; i < hosts.length; i++) {
datanodes[i] = new DatanodeAdminProperties();
DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId();
datanodes[i].setHostName(datanodeID.getHostName());
datanodes[i].setPort(datanodeID.getXferPort());
datanodes[i].setUpgradeDomain(upgradeDomains[i]);
}
datanodes[2].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
datanodes[3].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
hostsFileWriter.initIncludeHosts(datanodes);
cluster.getFileSystem().refreshNodes();
expectedDatanodeIDs.clear();
expectedDatanodeIDs.add(cluster.getDataNodes().get(0).getDatanodeId());
expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId());
}
@Test @Test
public void testPlacement() throws Exception { public void testPlacement() throws Exception {
String clientMachine = "127.0.0.1"; final long fileSize = DEFAULT_BLOCK_SIZE * 5;
for (int i = 0; i < 5; i++) { final String testFile = new String("/testfile");
String src = "/test-" + i; final Path path = new Path(testFile);
// Create the file with client machine DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize,
HdfsFileStatus fileStatus = namesystem.startFile(src, perm, REPLICATION_FACTOR, 1000L);
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, LocatedBlocks locatedBlocks =
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); cluster.getFileSystem().getClient().getLocatedBlocks(
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, path.toString(), 0, fileSize);
null, null, fileStatus.getFileId(), null, null); for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
Set<DatanodeInfo> locs = new HashSet<>();
assertEquals("Block should be allocated sufficient locations", for(DatanodeInfo datanodeInfo : block.getLocations()) {
REPLICATION_FACTOR, locatedBlock.getLocations().length); if (datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.NORMAL) {
Set<DatanodeInfo> locs = new HashSet<>(Arrays.asList( locs.add(datanodeInfo);
locatedBlock.getLocations())); }
for (DatanodeID datanodeID : expectedDatanodeIDs) {
locs.contains(datanodeID);
} }
for (DatanodeID datanodeID : expectedDatanodeIDs) {
assertTrue(locs.contains(datanodeID));
}
}
}
nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(), @Test(timeout = 300000)
src, clientMachine); public void testPlacementAfterDecommission() throws Exception {
final long fileSize = DEFAULT_BLOCK_SIZE * 5;
final String testFile = new String("/testfile");
final Path path = new Path(testFile);
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize,
REPLICATION_FACTOR, 1000L);
// Decommission some nodes and wait until decommissions have finished.
refreshDatanodeAdminProperties2();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
boolean successful = true;
LocatedBlocks locatedBlocks;
try {
locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks(
path.toString(), 0, fileSize);
} catch (IOException ioe) {
return false;
}
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
Set<DatanodeInfo> locs = new HashSet<>();
for (DatanodeInfo datanodeInfo : block.getLocations()) {
if (datanodeInfo.getAdminState() ==
DatanodeInfo.AdminStates.NORMAL) {
locs.add(datanodeInfo);
}
}
for (DatanodeID datanodeID : expectedDatanodeIDs) {
successful = successful && locs.contains(datanodeID);
}
}
return successful;
}
}, 1000, 60000);
// Verify block placement policy of each block.
LocatedBlocks locatedBlocks;
locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks(
path.toString(), 0, fileSize);
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
BlockPlacementStatus status = cluster.getNamesystem().getBlockManager().
getBlockPlacementPolicy().verifyBlockPlacement(
block.getLocations(), REPLICATION_FACTOR);
assertTrue(status.isPlacementPolicySatisfied());
} }
} }
} }