HDFS-16575. [SPS]Should use real replication num instead getReplication from namenode

This commit is contained in:
liubingxing 2022-05-10 12:02:42 +08:00
parent d486ae8c0f
commit ff3d47bea3
2 changed files with 115 additions and 1 deletions

View File

@ -402,7 +402,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
} else {
expectedStorageTypes = existingStoragePolicy
.chooseStorageTypes(fileInfo.getReplication());
.chooseStorageTypes((short) blockInfo.getLocations().length);
}
List<StorageType> existing = new LinkedList<StorageType>(

View File

@ -48,6 +48,7 @@ import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
@ -59,6 +60,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -82,6 +84,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
@ -95,6 +98,7 @@ import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.ExitUtil;
import org.junit.After;
import org.junit.Assert;
@ -1851,4 +1855,114 @@ public class TestExternalStoragePolicySatisfier {
shutdownCluster();
}
}
private void writeConfigFile(FileSystem localFs, Path name, List<String> nodes)
throws IOException {
// delete if it already exists
if (localFs.exists(name)) {
localFs.delete(name, true);
}
FSDataOutputStream stm = localFs.create(name);
if (nodes != null) {
for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
String node = it.next();
stm.writeBytes(node);
stm.writeBytes("\n");
}
}
stm.close();
}
private void waitNodeState(DatanodeInfo node, DatanodeInfo.AdminStates state) {
boolean done = state == node.getAdminState();
while (!done) {
LOG.info("Waiting for node " + node + " to change state to " + state
+ " current state: " + node.getAdminState());
try {
Thread.sleep(1 * 500);
} catch (InterruptedException e) {
// nothing
}
done = state == node.getAdminState();
}
LOG.info("node " + node + " reached the state " + state);
}
@Test
public void testSPSWithDecommission() throws Exception {
try {
FileSystem localFs = FileSystem.getLocal(config);
Path workingDir = localFs.getWorkingDirectory();
Path decommissionDir = new Path(workingDir,
PathUtils.getTestDirName(getClass()) + "/work-dir/decommission");
Path hostsFile = new Path(decommissionDir, "hosts");
Path excludeFile = new Path(decommissionDir, "exclude");
writeConfigFile(localFs, hostsFile, null);
writeConfigFile(localFs, excludeFile, null);
config.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
config.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
// start cluster with 5 dn
final int numOfDn = 5;
StorageType[][] newtypes = new StorageType[][]{
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK}};
hdfsCluster = startCluster(config, newtypes, numOfDn, 2, CAPACITY);
hdfsCluster.waitActive();
// write file with 3 replication [DISK, DISK, DISK]
dfs = hdfsCluster.getFileSystem();
String file = "/testSPSWithDecommissionFile";
Path filePath = new Path(file);
writeContent(file, (short)3);
// select 2 dn for decommission
ArrayList<String> excludedList = new ArrayList<>();
DFSClient client = new DFSClient(hdfsCluster.getNameNode(0).
getNameNodeAddress(), config);
DatanodeInfo[] info = client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
int count = 0;
int total = 2;
int [] arr = new int [total];
for (int j = 0, i = 0; i < info.length; ++i) {
if (count < total && info[i].getNumBlocks() != 0) {
excludedList.add(info[i].getXferAddr());
count++;
arr[j++] = i;
}
}
// before decommission, set path with ALL_SSD storage policy
dfs.setStoragePolicy(filePath, "ALL_SSD");
// doing decommission, the replications will become [DISK, DISK, DISK, SSD, SSD]
writeConfigFile(localFs, excludeFile, excludedList);
hdfsCluster.getNamesystem(0)
.getBlockManager()
.getDatanodeManager()
.refreshNodes(config);
for (int index = 0; index < arr.length; ++index) {
DatanodeInfo ret = NameNodeAdapter
.getDatanode(hdfsCluster.getNamesystem(0), info[arr[index]]);
waitNodeState(ret, DatanodeInfo.AdminStates.DECOMMISSIONED);
}
// after decommission, set path with HOT storage policy
dfs.setStoragePolicy(filePath, "HOT");
dfs.satisfyStoragePolicy(filePath);
// the replications should not with SSD type
hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 5, 30000,
dfs);
} finally {
shutdownCluster();
}
}
}