HDFS-9922. Upgrade Domain placement policy status marks a good block in violation when there are decommissioned nodes. (Chris Trezzo via mingma)
(cherry picked from commit b48f27e794
)
This commit is contained in:
parent
6dd34baf3c
commit
32b115da1d
|
@ -60,7 +60,7 @@ public class BlockPlacementStatusWithUpgradeDomain implements
|
||||||
|
|
||||||
private boolean isUpgradeDomainPolicySatisfied() {
|
private boolean isUpgradeDomainPolicySatisfied() {
|
||||||
if (numberOfReplicas <= upgradeDomainFactor) {
|
if (numberOfReplicas <= upgradeDomainFactor) {
|
||||||
return (numberOfReplicas == upgradeDomains.size());
|
return (numberOfReplicas <= upgradeDomains.size());
|
||||||
} else {
|
} else {
|
||||||
return upgradeDomains.size() >= upgradeDomainFactor;
|
return upgradeDomains.size() >= upgradeDomainFactor;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for BlockPlacementStatusWithUpgradeDomain class.
|
||||||
|
*/
|
||||||
|
public class TestBlockPlacementStatusWithUpgradeDomain {
|
||||||
|
|
||||||
|
private Set<String> upgradeDomains;
|
||||||
|
private BlockPlacementStatusDefault bpsd =
|
||||||
|
mock(BlockPlacementStatusDefault.class);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
upgradeDomains = new HashSet<String>();
|
||||||
|
upgradeDomains.add("1");
|
||||||
|
upgradeDomains.add("2");
|
||||||
|
upgradeDomains.add("3");
|
||||||
|
when(bpsd.isPlacementPolicySatisfied()).thenReturn(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsPolicySatisfiedParentFalse() {
|
||||||
|
when(bpsd.isPlacementPolicySatisfied()).thenReturn(false);
|
||||||
|
BlockPlacementStatusWithUpgradeDomain bps =
|
||||||
|
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
|
||||||
|
|
||||||
|
// Parent policy is not satisfied but upgrade domain policy is
|
||||||
|
assertFalse(bps.isPlacementPolicySatisfied());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsPolicySatisfiedAllEqual() {
|
||||||
|
BlockPlacementStatusWithUpgradeDomain bps =
|
||||||
|
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
|
||||||
|
// Number of domains, replicas and upgradeDomainFactor is equal and parent
|
||||||
|
// policy is satisfied
|
||||||
|
assertTrue(bps.isPlacementPolicySatisfied());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsPolicySatisifedSmallDomains() {
|
||||||
|
// Number of domains is less than replicas but equal to factor
|
||||||
|
BlockPlacementStatusWithUpgradeDomain bps =
|
||||||
|
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 3);
|
||||||
|
assertTrue(bps.isPlacementPolicySatisfied());
|
||||||
|
|
||||||
|
// Same as above but replicas is greater than factor
|
||||||
|
bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 2);
|
||||||
|
assertTrue(bps.isPlacementPolicySatisfied());
|
||||||
|
|
||||||
|
// Number of domains is less than replicas and factor
|
||||||
|
bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 4);
|
||||||
|
assertFalse(bps.isPlacementPolicySatisfied());
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,38 +17,40 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
|
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
||||||
import org.apache.hadoop.net.StaticMapping;
|
import org.apache.hadoop.net.StaticMapping;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* End-to-end test case for upgrade domain
|
* End-to-end test case for upgrade domain
|
||||||
* The test configs upgrade domain for nodes via admin json
|
* The test configs upgrade domain for nodes via admin json
|
||||||
|
@ -72,12 +74,13 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
{"127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1",
|
{"127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1",
|
||||||
"127.0.0.1", "127.0.0.1"};
|
"127.0.0.1", "127.0.0.1"};
|
||||||
static final String[] upgradeDomains =
|
static final String[] upgradeDomains =
|
||||||
{ "ud1", "ud2", "ud3", "ud1", "ud2", "ud3" };
|
{"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"};
|
||||||
static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
|
static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
|
||||||
private MiniDFSCluster cluster = null;
|
private MiniDFSCluster cluster = null;
|
||||||
private NamenodeProtocols nameNodeRpc = null;
|
private NamenodeProtocols nameNodeRpc = null;
|
||||||
private FSNamesystem namesystem = null;
|
private FSNamesystem namesystem = null;
|
||||||
private PermissionStatus perm = null;
|
private PermissionStatus perm = null;
|
||||||
|
private HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
|
@ -90,7 +93,6 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
BlockPlacementPolicy.class);
|
BlockPlacementPolicy.class);
|
||||||
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
|
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
|
||||||
CombinedHostFileManager.class, HostConfigManager.class);
|
CombinedHostFileManager.class, HostConfigManager.class);
|
||||||
HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
|
||||||
hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy");
|
hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy");
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
|
||||||
|
@ -100,12 +102,12 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
namesystem = cluster.getNamesystem();
|
namesystem = cluster.getNamesystem();
|
||||||
perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null,
|
perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null,
|
||||||
FsPermission.getDefault());
|
FsPermission.getDefault());
|
||||||
refreshDatanodeAdminProperties(hostsFileWriter);
|
refreshDatanodeAdminProperties();
|
||||||
hostsFileWriter.cleanup();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void teardown() {
|
public void teardown() throws IOException {
|
||||||
|
hostsFileWriter.cleanup();
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
cluster = null;
|
cluster = null;
|
||||||
|
@ -114,15 +116,18 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Define admin properties for these datanodes as follows.
|
* Define admin properties for these datanodes as follows.
|
||||||
* dn0 and dn3 have upgrade domain ud1.
|
* dn0's upgrade domain is ud5.
|
||||||
* dn1 and dn4 have upgrade domain ud2.
|
* dn1's upgrade domain is ud2.
|
||||||
* dn2 and dn5 have upgrade domain ud3.
|
* dn2's upgrade domain is ud3.
|
||||||
|
* dn3's upgrade domain is ud1.
|
||||||
|
* dn4's upgrade domain is ud2.
|
||||||
|
* dn5's upgrade domain is ud4.
|
||||||
* dn0 and dn5 are decommissioned.
|
* dn0 and dn5 are decommissioned.
|
||||||
* Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
|
* Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
|
||||||
* rack2. Then any block's replicas should be on either
|
* rack2. Then any block's replicas should be on either
|
||||||
* {dn1, dn2, d3} or {dn2, dn3, dn4}.
|
* {dn1, dn2, d3} or {dn2, dn3, dn4}.
|
||||||
*/
|
*/
|
||||||
private void refreshDatanodeAdminProperties(HostsFileWriter hostsFileWriter)
|
private void refreshDatanodeAdminProperties()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
|
DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
|
||||||
hosts.length];
|
hosts.length];
|
||||||
|
@ -138,32 +143,116 @@ public class TestUpgradeDomainBlockPlacementPolicy {
|
||||||
hostsFileWriter.initIncludeHosts(datanodes);
|
hostsFileWriter.initIncludeHosts(datanodes);
|
||||||
cluster.getFileSystem().refreshNodes();
|
cluster.getFileSystem().refreshNodes();
|
||||||
|
|
||||||
|
expectedDatanodeIDs.clear();
|
||||||
expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId());
|
expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId());
|
||||||
expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId());
|
expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Define admin properties for these datanodes as follows.
|
||||||
|
* dn0's upgrade domain is ud5.
|
||||||
|
* dn1's upgrade domain is ud2.
|
||||||
|
* dn2's upgrade domain is ud3.
|
||||||
|
* dn3's upgrade domain is ud1.
|
||||||
|
* dn4's upgrade domain is ud2.
|
||||||
|
* dn5's upgrade domain is ud4.
|
||||||
|
* dn2 and dn3 are decommissioned.
|
||||||
|
* Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on
|
||||||
|
* rack2. Then any block's replicas should be on either
|
||||||
|
* {dn0, dn1, d5} or {dn0, dn4, dn5}.
|
||||||
|
*/
|
||||||
|
private void refreshDatanodeAdminProperties2()
|
||||||
|
throws IOException {
|
||||||
|
DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[
|
||||||
|
hosts.length];
|
||||||
|
for (int i = 0; i < hosts.length; i++) {
|
||||||
|
datanodes[i] = new DatanodeAdminProperties();
|
||||||
|
DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId();
|
||||||
|
datanodes[i].setHostName(datanodeID.getHostName());
|
||||||
|
datanodes[i].setPort(datanodeID.getXferPort());
|
||||||
|
datanodes[i].setUpgradeDomain(upgradeDomains[i]);
|
||||||
|
}
|
||||||
|
datanodes[2].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
|
||||||
|
datanodes[3].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED);
|
||||||
|
hostsFileWriter.initIncludeHosts(datanodes);
|
||||||
|
cluster.getFileSystem().refreshNodes();
|
||||||
|
|
||||||
|
expectedDatanodeIDs.clear();
|
||||||
|
expectedDatanodeIDs.add(cluster.getDataNodes().get(0).getDatanodeId());
|
||||||
|
expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPlacement() throws Exception {
|
public void testPlacement() throws Exception {
|
||||||
String clientMachine = "127.0.0.1";
|
final long fileSize = DEFAULT_BLOCK_SIZE * 5;
|
||||||
for (int i = 0; i < 5; i++) {
|
final String testFile = new String("/testfile");
|
||||||
String src = "/test-" + i;
|
final Path path = new Path(testFile);
|
||||||
// Create the file with client machine
|
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize,
|
||||||
HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
|
REPLICATION_FACTOR, 1000L);
|
||||||
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
|
LocatedBlocks locatedBlocks =
|
||||||
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
|
cluster.getFileSystem().getClient().getLocatedBlocks(
|
||||||
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
|
path.toString(), 0, fileSize);
|
||||||
null, null, fileStatus.getFileId(), null, null);
|
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
||||||
|
Set<DatanodeInfo> locs = new HashSet<>();
|
||||||
assertEquals("Block should be allocated sufficient locations",
|
for(DatanodeInfo datanodeInfo : block.getLocations()) {
|
||||||
REPLICATION_FACTOR, locatedBlock.getLocations().length);
|
if (datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.NORMAL) {
|
||||||
Set<DatanodeInfo> locs = new HashSet<>(Arrays.asList(
|
locs.add(datanodeInfo);
|
||||||
locatedBlock.getLocations()));
|
}
|
||||||
|
}
|
||||||
for (DatanodeID datanodeID : expectedDatanodeIDs) {
|
for (DatanodeID datanodeID : expectedDatanodeIDs) {
|
||||||
locs.contains(datanodeID);
|
assertTrue(locs.contains(datanodeID));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
|
@Test(timeout = 300000)
|
||||||
src, clientMachine);
|
public void testPlacementAfterDecommission() throws Exception {
|
||||||
|
final long fileSize = DEFAULT_BLOCK_SIZE * 5;
|
||||||
|
final String testFile = new String("/testfile");
|
||||||
|
final Path path = new Path(testFile);
|
||||||
|
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize,
|
||||||
|
REPLICATION_FACTOR, 1000L);
|
||||||
|
|
||||||
|
// Decommission some nodes and wait until decommissions have finished.
|
||||||
|
refreshDatanodeAdminProperties2();
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
boolean successful = true;
|
||||||
|
LocatedBlocks locatedBlocks;
|
||||||
|
try {
|
||||||
|
locatedBlocks =
|
||||||
|
cluster.getFileSystem().getClient().getLocatedBlocks(
|
||||||
|
path.toString(), 0, fileSize);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
||||||
|
Set<DatanodeInfo> locs = new HashSet<>();
|
||||||
|
for (DatanodeInfo datanodeInfo : block.getLocations()) {
|
||||||
|
if (datanodeInfo.getAdminState() ==
|
||||||
|
DatanodeInfo.AdminStates.NORMAL) {
|
||||||
|
locs.add(datanodeInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (DatanodeID datanodeID : expectedDatanodeIDs) {
|
||||||
|
successful = successful && locs.contains(datanodeID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return successful;
|
||||||
|
}
|
||||||
|
}, 1000, 60000);
|
||||||
|
|
||||||
|
// Verify block placement policy of each block.
|
||||||
|
LocatedBlocks locatedBlocks;
|
||||||
|
locatedBlocks =
|
||||||
|
cluster.getFileSystem().getClient().getLocatedBlocks(
|
||||||
|
path.toString(), 0, fileSize);
|
||||||
|
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
||||||
|
BlockPlacementStatus status = cluster.getNamesystem().getBlockManager().
|
||||||
|
getBlockPlacementPolicy().verifyBlockPlacement(
|
||||||
|
block.getLocations(), REPLICATION_FACTOR);
|
||||||
|
assertTrue(status.isPlacementPolicySatisfied());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue