HDFS-9015. Refactor TestReplicationPolicy to test different block placement policies. (Ming Ma via lei)
(cherry picked from commit 260b9d9410e45dbcb89d97d58450c79220c9e7bc)
This commit is contained in:
parent
7dd9ebdebc
commit
4d01dbda50
|
@ -662,6 +662,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
'CredentialBasedAccessTokenProvider.getCredential()' abstract methods to
|
'CredentialBasedAccessTokenProvider.getCredential()' abstract methods to
|
||||||
public (Santhosh Nayak via cnauroth)
|
public (Santhosh Nayak via cnauroth)
|
||||||
|
|
||||||
|
HDFS-9015. Refactor TestReplicationPolicy to test different block placement
|
||||||
|
policies. (Ming Ma via lei)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -0,0 +1,160 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
import org.apache.hadoop.net.Node;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.PathUtils;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
abstract public class BaseReplicationPolicyTest {
|
||||||
|
{
|
||||||
|
GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected NetworkTopology cluster;
|
||||||
|
protected DatanodeDescriptor dataNodes[];
|
||||||
|
protected static final int BLOCK_SIZE = 1024;
|
||||||
|
protected NameNode namenode;
|
||||||
|
protected DatanodeManager dnManager;
|
||||||
|
protected BlockPlacementPolicy replicator;
|
||||||
|
protected final String filename = "/dummyfile.txt";
|
||||||
|
protected DatanodeStorageInfo[] storages;
|
||||||
|
protected String blockPlacementPolicy;
|
||||||
|
protected NamenodeProtocols nameNodeRpc = null;
|
||||||
|
|
||||||
|
static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
|
||||||
|
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
|
||||||
|
long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
|
||||||
|
int volFailures) {
|
||||||
|
dn.getStorageInfos()[0].setUtilizationForTesting(
|
||||||
|
capacity, dfsUsed, remaining, blockPoolUsed);
|
||||||
|
dn.updateHeartbeat(
|
||||||
|
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
|
||||||
|
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setupCluster() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
dataNodes = getDatanodeDescriptors(conf);
|
||||||
|
|
||||||
|
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
||||||
|
new File(baseDir, "name").getPath());
|
||||||
|
conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
||||||
|
blockPlacementPolicy);
|
||||||
|
conf.setBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||||
|
conf.setBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
|
||||||
|
DFSTestUtil.formatNameNode(conf);
|
||||||
|
namenode = new NameNode(conf);
|
||||||
|
nameNodeRpc = namenode.getRpcServer();
|
||||||
|
|
||||||
|
final BlockManager bm = namenode.getNamesystem().getBlockManager();
|
||||||
|
replicator = bm.getBlockPlacementPolicy();
|
||||||
|
cluster = bm.getDatanodeManager().getNetworkTopology();
|
||||||
|
dnManager = bm.getDatanodeManager();
|
||||||
|
// construct network topology
|
||||||
|
for (int i=0; i < dataNodes.length; i++) {
|
||||||
|
cluster.add(dataNodes[i]);
|
||||||
|
//bm.getDatanodeManager().getHost2DatanodeMap().add(dataNodes[i]);
|
||||||
|
bm.getDatanodeManager().getHeartbeatManager().addDatanode(
|
||||||
|
dataNodes[i]);
|
||||||
|
}
|
||||||
|
updateHeartbeatWithUsage();
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateHeartbeatWithUsage() {
|
||||||
|
for (int i=0; i < dataNodes.length; i++) {
|
||||||
|
updateHeartbeatWithUsage(dataNodes[i],
|
||||||
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
namenode.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
|
||||||
|
return isOnSameRack(left, right.getDatanodeDescriptor());
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
|
||||||
|
return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
|
||||||
|
}
|
||||||
|
|
||||||
|
DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
|
||||||
|
return chooseTarget(numOfReplicas, dataNodes[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
||||||
|
DatanodeDescriptor writer) {
|
||||||
|
return chooseTarget(numOfReplicas, writer,
|
||||||
|
new ArrayList<DatanodeStorageInfo>());
|
||||||
|
}
|
||||||
|
|
||||||
|
DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
||||||
|
List<DatanodeStorageInfo> chosenNodes) {
|
||||||
|
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
||||||
|
DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
|
||||||
|
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
||||||
|
List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) {
|
||||||
|
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes,
|
||||||
|
excludedNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
||||||
|
DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes,
|
||||||
|
Set<Node> excludedNodes) {
|
||||||
|
return replicator.chooseTarget(filename, numOfReplicas, writer,
|
||||||
|
chosenNodes, false, excludedNodes, BLOCK_SIZE,
|
||||||
|
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,7 +26,6 @@ import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -39,7 +38,6 @@ import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
@ -55,52 +53,40 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.apache.hadoop.test.PathUtils;
|
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.log4j.spi.LoggingEvent;
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
public class TestReplicationPolicy {
|
@RunWith(Parameterized.class)
|
||||||
{
|
public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
||||||
GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final int BLOCK_SIZE = 1024;
|
|
||||||
private static final int NUM_OF_DATANODES = 6;
|
|
||||||
private static NetworkTopology cluster;
|
|
||||||
private static NameNode namenode;
|
|
||||||
private static BlockPlacementPolicy replicator;
|
|
||||||
private static final String filename = "/dummyfile.txt";
|
private static final String filename = "/dummyfile.txt";
|
||||||
private static DatanodeDescriptor[] dataNodes;
|
|
||||||
private static DatanodeStorageInfo[] storages;
|
|
||||||
// The interval for marking a datanode as stale,
|
// The interval for marking a datanode as stale,
|
||||||
private static final long staleInterval =
|
private static final long staleInterval =
|
||||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
|
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public ExpectedException exception = ExpectedException.none();
|
public ExpectedException exception = ExpectedException.none();
|
||||||
|
|
||||||
private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
|
public TestReplicationPolicy(String blockPlacementPolicyClassName) {
|
||||||
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
|
this.blockPlacementPolicy = blockPlacementPolicyClassName;
|
||||||
long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) {
|
|
||||||
dn.getStorageInfos()[0].setUtilizationForTesting(
|
|
||||||
capacity, dfsUsed, remaining, blockPoolUsed);
|
|
||||||
dn.updateHeartbeat(
|
|
||||||
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
|
|
||||||
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void updateHeartbeatForExtraStorage(long capacity,
|
@Parameterized.Parameters
|
||||||
|
public static Iterable<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
{ BlockPlacementPolicyDefault.class.getName() } });
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateHeartbeatForExtraStorage(long capacity,
|
||||||
long dfsUsed, long remaining, long blockPoolUsed) {
|
long dfsUsed, long remaining, long blockPoolUsed) {
|
||||||
DatanodeDescriptor dn = dataNodes[5];
|
DatanodeDescriptor dn = dataNodes[5];
|
||||||
dn.getStorageInfos()[1].setUtilizationForTesting(
|
dn.getStorageInfos()[1].setUtilizationForTesting(
|
||||||
|
@ -110,9 +96,19 @@ public class TestReplicationPolicy {
|
||||||
0L, 0L, 0, 0, null);
|
0L, 0L, 0, 0, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
private void resetHeartbeatForStorages() {
|
||||||
public static void setupCluster() throws Exception {
|
for (int i=0; i < dataNodes.length; i++) {
|
||||||
Configuration conf = new HdfsConfiguration();
|
updateHeartbeatWithUsage(dataNodes[i],
|
||||||
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L,
|
||||||
|
0, 0);
|
||||||
|
}
|
||||||
|
// No available space in the extra storage of dn0
|
||||||
|
updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
|
||||||
final String[] racks = {
|
final String[] racks = {
|
||||||
"/d1/r1",
|
"/d1/r1",
|
||||||
"/d1/r1",
|
"/d1/r1",
|
||||||
|
@ -121,59 +117,13 @@ public class TestReplicationPolicy {
|
||||||
"/d2/r3",
|
"/d2/r3",
|
||||||
"/d2/r3"};
|
"/d2/r3"};
|
||||||
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
||||||
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
|
|
||||||
|
|
||||||
// create an extra storage for dn5.
|
// create an extra storage for dn5.
|
||||||
DatanodeStorage extraStorage = new DatanodeStorage(
|
DatanodeStorage extraStorage = new DatanodeStorage(
|
||||||
storages[5].getStorageID() + "-extra", DatanodeStorage.State.NORMAL,
|
storages[5].getStorageID() + "-extra", DatanodeStorage.State.NORMAL,
|
||||||
StorageType.DEFAULT);
|
StorageType.DEFAULT);
|
||||||
/* DatanodeStorageInfo si = new DatanodeStorageInfo(
|
|
||||||
storages[5].getDatanodeDescriptor(), extraStorage);
|
|
||||||
*/
|
|
||||||
BlockManagerTestUtil.updateStorage(storages[5].getDatanodeDescriptor(),
|
BlockManagerTestUtil.updateStorage(storages[5].getDatanodeDescriptor(),
|
||||||
extraStorage);
|
extraStorage);
|
||||||
|
return DFSTestUtil.toDatanodeDescriptor(storages);
|
||||||
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
|
||||||
File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
|
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
|
||||||
new File(baseDir, "name").getPath());
|
|
||||||
|
|
||||||
conf.setBoolean(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
|
||||||
conf.setBoolean(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
|
|
||||||
DFSTestUtil.formatNameNode(conf);
|
|
||||||
namenode = new NameNode(conf);
|
|
||||||
|
|
||||||
final BlockManager bm = namenode.getNamesystem().getBlockManager();
|
|
||||||
replicator = bm.getBlockPlacementPolicy();
|
|
||||||
cluster = bm.getDatanodeManager().getNetworkTopology();
|
|
||||||
// construct network topology
|
|
||||||
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
|
||||||
cluster.add(dataNodes[i]);
|
|
||||||
bm.getDatanodeManager().getHeartbeatManager().addDatanode(
|
|
||||||
dataNodes[i]);
|
|
||||||
}
|
|
||||||
resetHeartbeatForStorages();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void resetHeartbeatForStorages() {
|
|
||||||
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
|
||||||
updateHeartbeatWithUsage(dataNodes[i],
|
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
|
||||||
}
|
|
||||||
// No available space in the extra storage of dn0
|
|
||||||
updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
|
|
||||||
return isOnSameRack(left, right.getDatanodeDescriptor());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
|
|
||||||
return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -269,40 +219,6 @@ public class TestReplicationPolicy {
|
||||||
resetHeartbeatForStorages();
|
resetHeartbeatForStorages();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
|
|
||||||
return chooseTarget(numOfReplicas, dataNodes[0]);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
|
||||||
DatanodeDescriptor writer) {
|
|
||||||
return chooseTarget(numOfReplicas, writer,
|
|
||||||
new ArrayList<DatanodeStorageInfo>());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
|
||||||
List<DatanodeStorageInfo> chosenNodes) {
|
|
||||||
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
|
||||||
DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
|
|
||||||
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
|
||||||
List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) {
|
|
||||||
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static DatanodeStorageInfo[] chooseTarget(
|
|
||||||
int numOfReplicas,
|
|
||||||
DatanodeDescriptor writer,
|
|
||||||
List<DatanodeStorageInfo> chosenNodes,
|
|
||||||
Set<Node> excludedNodes) {
|
|
||||||
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
|
|
||||||
false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In this testcase, client is dataNodes[0], but the dataNodes[1] is
|
* In this testcase, client is dataNodes[0], but the dataNodes[1] is
|
||||||
* not allowed to be chosen. So the 1st replica should be
|
* not allowed to be chosen. So the 1st replica should be
|
||||||
|
@ -555,7 +471,7 @@ public class TestReplicationPolicy {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
try {
|
try {
|
||||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||||
.setNumStaleNodes(NUM_OF_DATANODES);
|
.setNumStaleNodes(dataNodes.length);
|
||||||
testChooseTargetWithMoreThanAvailableNodes();
|
testChooseTargetWithMoreThanAvailableNodes();
|
||||||
} finally {
|
} finally {
|
||||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||||
|
@ -583,8 +499,8 @@ public class TestReplicationPolicy {
|
||||||
|
|
||||||
// try to choose NUM_OF_DATANODES which is more than actually available
|
// try to choose NUM_OF_DATANODES which is more than actually available
|
||||||
// nodes.
|
// nodes.
|
||||||
DatanodeStorageInfo[] targets = chooseTarget(NUM_OF_DATANODES);
|
DatanodeStorageInfo[] targets = chooseTarget(dataNodes.length);
|
||||||
assertEquals(targets.length, NUM_OF_DATANODES - 2);
|
assertEquals(targets.length, dataNodes.length - 2);
|
||||||
|
|
||||||
final List<LoggingEvent> log = appender.getLog();
|
final List<LoggingEvent> log = appender.getLog();
|
||||||
assertNotNull(log);
|
assertNotNull(log);
|
||||||
|
@ -1256,7 +1172,7 @@ public class TestReplicationPolicy {
|
||||||
// Adding this block will increase its current replication, and that will
|
// Adding this block will increase its current replication, and that will
|
||||||
// remove it from the queue.
|
// remove it from the queue.
|
||||||
bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
|
bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
|
||||||
ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]);
|
ReplicaState.FINALIZED), storages[0]);
|
||||||
|
|
||||||
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
||||||
// from QUEUE_VERY_UNDER_REPLICATED.
|
// from QUEUE_VERY_UNDER_REPLICATED.
|
||||||
|
|
|
@ -20,85 +20,45 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
||||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
||||||
import org.apache.hadoop.test.PathUtils;
|
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
public class TestReplicationPolicyConsiderLoad {
|
@RunWith(Parameterized.class)
|
||||||
|
public class TestReplicationPolicyConsiderLoad
|
||||||
|
extends BaseReplicationPolicyTest {
|
||||||
|
|
||||||
private static NameNode namenode;
|
public TestReplicationPolicyConsiderLoad(String blockPlacementPolicy) {
|
||||||
private static DatanodeManager dnManager;
|
this.blockPlacementPolicy = blockPlacementPolicy;
|
||||||
private static List<DatanodeRegistration> dnrList;
|
}
|
||||||
private static DatanodeDescriptor[] dataNodes;
|
|
||||||
private static DatanodeStorageInfo[] storages;
|
|
||||||
|
|
||||||
@BeforeClass
|
@Parameterized.Parameters
|
||||||
public static void setupCluster() throws IOException {
|
public static Iterable<Object[]> data() {
|
||||||
Configuration conf = new HdfsConfiguration();
|
return Arrays.asList(new Object[][] {
|
||||||
|
{ BlockPlacementPolicyDefault.class.getName() } });
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
|
||||||
final String[] racks = {
|
final String[] racks = {
|
||||||
"/rack1",
|
|
||||||
"/rack1",
|
"/rack1",
|
||||||
"/rack1",
|
"/rack1",
|
||||||
"/rack2",
|
"/rack2",
|
||||||
"/rack2",
|
"/rack2",
|
||||||
"/rack2"};
|
"/rack3",
|
||||||
|
"/rack3"};
|
||||||
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
||||||
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
|
return DFSTestUtil.toDatanodeDescriptor(storages);
|
||||||
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
|
||||||
File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
|
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
|
||||||
new File(baseDir, "name").getPath());
|
|
||||||
conf.setBoolean(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
|
||||||
conf.setBoolean(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
|
|
||||||
conf.setBoolean(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
|
|
||||||
DFSTestUtil.formatNameNode(conf);
|
|
||||||
namenode = new NameNode(conf);
|
|
||||||
int blockSize = 1024;
|
|
||||||
|
|
||||||
dnrList = new ArrayList<DatanodeRegistration>();
|
|
||||||
dnManager = namenode.getNamesystem().getBlockManager().getDatanodeManager();
|
|
||||||
|
|
||||||
// Register DNs
|
|
||||||
for (int i=0; i < 6; i++) {
|
|
||||||
DatanodeRegistration dnr = new DatanodeRegistration(dataNodes[i],
|
|
||||||
new StorageInfo(NodeType.DATA_NODE), new ExportedBlockKeys(),
|
|
||||||
VersionInfo.getVersion());
|
|
||||||
dnrList.add(dnr);
|
|
||||||
dnManager.registerDatanode(dnr);
|
|
||||||
dataNodes[i].getStorageInfos()[0].setUtilizationForTesting(
|
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L,
|
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L);
|
|
||||||
dataNodes[i].updateHeartbeat(
|
|
||||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[i]),
|
|
||||||
0L, 0L, 0, 0, null);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final double EPSILON = 0.0001;
|
private final double EPSILON = 0.0001;
|
||||||
|
@ -110,46 +70,39 @@ public class TestReplicationPolicyConsiderLoad {
|
||||||
public void testChooseTargetWithDecomNodes() throws IOException {
|
public void testChooseTargetWithDecomNodes() throws IOException {
|
||||||
namenode.getNamesystem().writeLock();
|
namenode.getNamesystem().writeLock();
|
||||||
try {
|
try {
|
||||||
String blockPoolId = namenode.getNamesystem().getBlockPoolId();
|
dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[3],
|
||||||
dnManager.handleHeartbeat(dnrList.get(3),
|
|
||||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
|
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
|
||||||
blockPoolId, dataNodes[3].getCacheCapacity(),
|
dataNodes[3].getCacheCapacity(),
|
||||||
dataNodes[3].getCacheRemaining(),
|
dataNodes[3].getCacheUsed(),
|
||||||
2, 0, 0, null);
|
2, 0, null);
|
||||||
dnManager.handleHeartbeat(dnrList.get(4),
|
dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[4],
|
||||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]),
|
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]),
|
||||||
blockPoolId, dataNodes[4].getCacheCapacity(),
|
dataNodes[4].getCacheCapacity(),
|
||||||
dataNodes[4].getCacheRemaining(),
|
dataNodes[4].getCacheUsed(),
|
||||||
4, 0, 0, null);
|
4, 0, null);
|
||||||
dnManager.handleHeartbeat(dnrList.get(5),
|
dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[5],
|
||||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]),
|
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]),
|
||||||
blockPoolId, dataNodes[5].getCacheCapacity(),
|
dataNodes[5].getCacheCapacity(),
|
||||||
dataNodes[5].getCacheRemaining(),
|
dataNodes[5].getCacheUsed(),
|
||||||
4, 0, 0, null);
|
4, 0, null);
|
||||||
|
|
||||||
// value in the above heartbeats
|
// value in the above heartbeats
|
||||||
final int load = 2 + 4 + 4;
|
final int load = 2 + 4 + 4;
|
||||||
|
|
||||||
FSNamesystem fsn = namenode.getNamesystem();
|
|
||||||
assertEquals((double)load/6, dnManager.getFSClusterStats()
|
assertEquals((double)load/6, dnManager.getFSClusterStats()
|
||||||
.getInServiceXceiverAverage(), EPSILON);
|
.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
|
||||||
// Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
|
// Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
|
||||||
// returns false
|
// returns false
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
|
DatanodeDescriptor d = dataNodes[i];
|
||||||
dnManager.getDecomManager().startDecommission(d);
|
dnManager.getDecomManager().startDecommission(d);
|
||||||
d.setDecommissioned();
|
d.setDecommissioned();
|
||||||
}
|
}
|
||||||
assertEquals((double)load/3, dnManager.getFSClusterStats()
|
assertEquals((double)load/3, dnManager.getFSClusterStats()
|
||||||
.getInServiceXceiverAverage(), EPSILON);
|
.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
|
||||||
// update references of writer DN to update the de-commissioned state
|
DatanodeDescriptor writerDn = dataNodes[0];
|
||||||
List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
|
|
||||||
dnManager.fetchDatanodes(liveNodes, null, false);
|
|
||||||
DatanodeDescriptor writerDn = null;
|
|
||||||
if (liveNodes.contains(dataNodes[0])) {
|
|
||||||
writerDn = liveNodes.get(liveNodes.indexOf(dataNodes[0]));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call chooseTarget()
|
// Call chooseTarget()
|
||||||
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
||||||
|
@ -170,10 +123,4 @@ public class TestReplicationPolicyConsiderLoad {
|
||||||
namenode.getNamesystem().writeUnlock();
|
namenode.getNamesystem().writeUnlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void teardownCluster() {
|
|
||||||
if (namenode != null) namenode.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -32,38 +31,25 @@ import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
||||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
|
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.test.PathUtils;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
public class TestReplicationPolicyWithNodeGroup {
|
public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTest {
|
||||||
private static final int BLOCK_SIZE = 1024;
|
public TestReplicationPolicyWithNodeGroup() {
|
||||||
private static final int NUM_OF_DATANODES = 8;
|
this.blockPlacementPolicy = BlockPlacementPolicyWithNodeGroup.class.getName();
|
||||||
private static final int NUM_OF_DATANODES_BOUNDARY = 6;
|
}
|
||||||
private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
|
|
||||||
private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6;
|
|
||||||
private final Configuration CONF = new HdfsConfiguration();
|
|
||||||
private NetworkTopology cluster;
|
|
||||||
private NameNode namenode;
|
|
||||||
private BlockPlacementPolicy replicator;
|
|
||||||
private static final String filename = "/dummyfile.txt";
|
|
||||||
|
|
||||||
private static final DatanodeStorageInfo[] storages;
|
@Override
|
||||||
private static final DatanodeDescriptor[] dataNodes;
|
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
|
||||||
static {
|
conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
||||||
|
NetworkTopologyWithNodeGroup.class.getName());
|
||||||
final String[] racks = {
|
final String[] racks = {
|
||||||
"/d1/r1/n1",
|
"/d1/r1/n1",
|
||||||
"/d1/r1/n1",
|
"/d1/r1/n1",
|
||||||
|
@ -75,7 +61,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
"/d2/r3/n6"
|
"/d2/r3/n6"
|
||||||
};
|
};
|
||||||
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
||||||
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
|
return DFSTestUtil.toDatanodeDescriptor(storages);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final DatanodeStorageInfo[] storagesInBoundaryCase;
|
private static final DatanodeStorageInfo[] storagesInBoundaryCase;
|
||||||
|
@ -142,60 +128,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies);
|
dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies);
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
|
|
||||||
CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
|
||||||
// Set properties to make HDFS aware of NodeGroup.
|
|
||||||
CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
||||||
BlockPlacementPolicyWithNodeGroup.class.getName());
|
|
||||||
CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
|
||||||
NetworkTopologyWithNodeGroup.class.getName());
|
|
||||||
|
|
||||||
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
|
|
||||||
|
|
||||||
File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class);
|
|
||||||
|
|
||||||
CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
|
||||||
new File(baseDir, "name").getPath());
|
|
||||||
|
|
||||||
DFSTestUtil.formatNameNode(CONF);
|
|
||||||
namenode = new NameNode(CONF);
|
|
||||||
final BlockManager bm = namenode.getNamesystem().getBlockManager();
|
|
||||||
replicator = bm.getBlockPlacementPolicy();
|
|
||||||
cluster = bm.getDatanodeManager().getNetworkTopology();
|
|
||||||
// construct network topology
|
|
||||||
for(int i=0; i<NUM_OF_DATANODES; i++) {
|
|
||||||
cluster.add(dataNodes[i]);
|
|
||||||
}
|
|
||||||
setupDataNodeCapacity();
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
namenode.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
|
|
||||||
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
|
|
||||||
long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
|
|
||||||
int volFailures) {
|
|
||||||
dn.getStorageInfos()[0].setUtilizationForTesting(
|
|
||||||
capacity, dfsUsed, remaining, blockPoolUsed);
|
|
||||||
dn.updateHeartbeat(
|
|
||||||
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
|
|
||||||
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void setupDataNodeCapacity() {
|
|
||||||
for(int i=0; i<NUM_OF_DATANODES; i++) {
|
|
||||||
updateHeartbeatWithUsage(dataNodes[i],
|
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scan the targets list: all targets should be on different NodeGroups.
|
* Scan the targets list: all targets should be on different NodeGroups.
|
||||||
* Return false if two targets are found on the same NodeGroup.
|
* Return false if two targets are found on the same NodeGroup.
|
||||||
|
@ -217,10 +150,6 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
|
|
||||||
return isOnSameRack(left.getDatanodeDescriptor(), right);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right) {
|
private boolean isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right) {
|
||||||
return cluster.isOnSameRack(left, right.getDatanodeDescriptor());
|
return cluster.isOnSameRack(left, right.getDatanodeDescriptor());
|
||||||
}
|
}
|
||||||
|
@ -233,35 +162,6 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
return cluster.isOnSameNodeGroup(left, right.getDatanodeDescriptor());
|
return cluster.isOnSameNodeGroup(left, right.getDatanodeDescriptor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
|
|
||||||
return chooseTarget(numOfReplicas, dataNodes[0]);
|
|
||||||
}
|
|
||||||
|
|
||||||
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
|
||||||
DatanodeDescriptor writer) {
|
|
||||||
return chooseTarget(numOfReplicas, writer,
|
|
||||||
new ArrayList<DatanodeStorageInfo>());
|
|
||||||
}
|
|
||||||
|
|
||||||
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
|
||||||
List<DatanodeStorageInfo> chosenNodes) {
|
|
||||||
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
|
|
||||||
}
|
|
||||||
|
|
||||||
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
|
|
||||||
DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
|
|
||||||
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private DatanodeStorageInfo[] chooseTarget(
|
|
||||||
int numOfReplicas,
|
|
||||||
DatanodeDescriptor writer,
|
|
||||||
List<DatanodeStorageInfo> chosenNodes,
|
|
||||||
Set<Node> excludedNodes) {
|
|
||||||
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
|
|
||||||
false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In this testcase, client is dataNodes[0]. So the 1st replica should be
|
* In this testcase, client is dataNodes[0]. So the 1st replica should be
|
||||||
* placed on dataNodes[0], the 2nd replica should be placed on
|
* placed on dataNodes[0], the 2nd replica should be placed on
|
||||||
|
@ -467,7 +367,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testChooseTarget5() throws Exception {
|
public void testChooseTarget5() throws Exception {
|
||||||
setupDataNodeCapacity();
|
updateHeartbeatWithUsage();
|
||||||
DatanodeStorageInfo[] targets;
|
DatanodeStorageInfo[] targets;
|
||||||
targets = chooseTarget(0, NODE);
|
targets = chooseTarget(0, NODE);
|
||||||
assertEquals(targets.length, 0);
|
assertEquals(targets.length, 0);
|
||||||
|
@ -514,7 +414,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testRereplicate1() throws Exception {
|
public void testRereplicate1() throws Exception {
|
||||||
setupDataNodeCapacity();
|
updateHeartbeatWithUsage();
|
||||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||||
chosenNodes.add(storages[0]);
|
chosenNodes.add(storages[0]);
|
||||||
DatanodeStorageInfo[] targets;
|
DatanodeStorageInfo[] targets;
|
||||||
|
@ -547,7 +447,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testRereplicate2() throws Exception {
|
public void testRereplicate2() throws Exception {
|
||||||
setupDataNodeCapacity();
|
updateHeartbeatWithUsage();
|
||||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||||
chosenNodes.add(storages[0]);
|
chosenNodes.add(storages[0]);
|
||||||
chosenNodes.add(storages[1]);
|
chosenNodes.add(storages[1]);
|
||||||
|
@ -575,7 +475,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testRereplicate3() throws Exception {
|
public void testRereplicate3() throws Exception {
|
||||||
setupDataNodeCapacity();
|
updateHeartbeatWithUsage();
|
||||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||||
chosenNodes.add(storages[0]);
|
chosenNodes.add(storages[0]);
|
||||||
chosenNodes.add(storages[3]);
|
chosenNodes.add(storages[3]);
|
||||||
|
@ -671,19 +571,14 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testChooseTargetsOnBoundaryTopology() throws Exception {
|
public void testChooseTargetsOnBoundaryTopology() throws Exception {
|
||||||
for(int i=0; i<NUM_OF_DATANODES; i++) {
|
for(int i=0; i<dataNodes.length; i++) {
|
||||||
cluster.remove(dataNodes[i]);
|
cluster.remove(dataNodes[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
|
for(int i=0; i<dataNodesInBoundaryCase.length; i++) {
|
||||||
cluster.add(dataNodesInBoundaryCase[i]);
|
cluster.add(dataNodesInBoundaryCase[i]);
|
||||||
}
|
}
|
||||||
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
|
for(int i=0; i<dataNodesInBoundaryCase.length; i++) {
|
||||||
updateHeartbeatWithUsage(dataNodes[0],
|
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
|
||||||
(HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE,
|
|
||||||
0L, 0L, 0L, 0, 0);
|
|
||||||
|
|
||||||
updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
|
updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
||||||
|
@ -714,7 +609,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testRereplicateOnBoundaryTopology() throws Exception {
|
public void testRereplicateOnBoundaryTopology() throws Exception {
|
||||||
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
|
for(int i=0; i<dataNodesInBoundaryCase.length; i++) {
|
||||||
updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
|
updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
||||||
|
@ -738,21 +633,21 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testChooseMoreTargetsThanNodeGroups() throws Exception {
|
public void testChooseMoreTargetsThanNodeGroups() throws Exception {
|
||||||
for(int i=0; i<NUM_OF_DATANODES; i++) {
|
for(int i=0; i<dataNodes.length; i++) {
|
||||||
cluster.remove(dataNodes[i]);
|
cluster.remove(dataNodes[i]);
|
||||||
}
|
}
|
||||||
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
|
for(int i=0; i<dataNodesInBoundaryCase.length; i++) {
|
||||||
DatanodeDescriptor node = dataNodesInBoundaryCase[i];
|
DatanodeDescriptor node = dataNodesInBoundaryCase[i];
|
||||||
if (cluster.contains(node)) {
|
if (cluster.contains(node)) {
|
||||||
cluster.remove(node);
|
cluster.remove(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
|
for(int i=0; i<dataNodesInMoreTargetsCase.length; i++) {
|
||||||
cluster.add(dataNodesInMoreTargetsCase[i]);
|
cluster.add(dataNodesInMoreTargetsCase[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
|
for(int i=0; i<dataNodesInMoreTargetsCase.length; i++) {
|
||||||
updateHeartbeatWithUsage(dataNodesInMoreTargetsCase[i],
|
updateHeartbeatWithUsage(dataNodesInMoreTargetsCase[i],
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
||||||
|
@ -773,11 +668,11 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChooseTargetWithDependencies() throws Exception {
|
public void testChooseTargetWithDependencies() throws Exception {
|
||||||
for(int i=0; i<NUM_OF_DATANODES; i++) {
|
for(int i=0; i<dataNodes.length; i++) {
|
||||||
cluster.remove(dataNodes[i]);
|
cluster.remove(dataNodes[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
|
for(int i=0; i<dataNodesInMoreTargetsCase.length; i++) {
|
||||||
DatanodeDescriptor node = dataNodesInMoreTargetsCase[i];
|
DatanodeDescriptor node = dataNodesInMoreTargetsCase[i];
|
||||||
if (cluster.contains(node)) {
|
if (cluster.contains(node)) {
|
||||||
cluster.remove(node);
|
cluster.remove(node);
|
||||||
|
@ -787,7 +682,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
Host2NodesMap host2DatanodeMap = namenode.getNamesystem()
|
Host2NodesMap host2DatanodeMap = namenode.getNamesystem()
|
||||||
.getBlockManager()
|
.getBlockManager()
|
||||||
.getDatanodeManager().getHost2DatanodeMap();
|
.getDatanodeManager().getHost2DatanodeMap();
|
||||||
for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
|
for(int i=0; i<dataNodesForDependencies.length; i++) {
|
||||||
cluster.add(dataNodesForDependencies[i]);
|
cluster.add(dataNodesForDependencies[i]);
|
||||||
host2DatanodeMap.add(dataNodesForDependencies[i]);
|
host2DatanodeMap.add(dataNodesForDependencies[i]);
|
||||||
}
|
}
|
||||||
|
@ -803,7 +698,7 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
dataNodesForDependencies[3].getHostName());
|
dataNodesForDependencies[3].getHostName());
|
||||||
|
|
||||||
//Update heartbeat
|
//Update heartbeat
|
||||||
for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
|
for(int i=0; i<dataNodesForDependencies.length; i++) {
|
||||||
updateHeartbeatWithUsage(dataNodesForDependencies[i],
|
updateHeartbeatWithUsage(dataNodesForDependencies[i],
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
||||||
|
@ -825,8 +720,8 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4]));
|
assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4]));
|
||||||
|
|
||||||
//verify that all data nodes are in the excluded list
|
//verify that all data nodes are in the excluded list
|
||||||
assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES);
|
assertEquals(excludedNodes.size(), dataNodesForDependencies.length);
|
||||||
for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
|
for(int i=0; i<dataNodesForDependencies.length; i++) {
|
||||||
assertTrue(excludedNodes.contains(dataNodesForDependencies[i]));
|
assertTrue(excludedNodes.contains(dataNodesForDependencies[i]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue