HDFS-11965: [SPS]: Should give chance to satisfy the low redundant blocks before removing the xattr. Contributed by Surendra Singh Lilhore.
This commit is contained in:
parent
00cf207192
commit
0b360b16ab
|
@ -4323,6 +4323,21 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check file has low redundancy blocks.
|
||||||
|
*/
|
||||||
|
public boolean hasLowRedundancyBlocks(BlockCollection bc) {
|
||||||
|
boolean result = false;
|
||||||
|
for (BlockInfo block : bc.getBlocks()) {
|
||||||
|
short expected = getExpectedRedundancyNum(block);
|
||||||
|
final NumberReplicas n = countNodes(block);
|
||||||
|
if (expected > n.liveReplicas()) {
|
||||||
|
result = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check sufficient redundancy of the blocks in the collection. If any block
|
* Check sufficient redundancy of the blocks in the collection. If any block
|
||||||
* is needed reconstruction, insert it into the reconstruction queue.
|
* is needed reconstruction, insert it into the reconstruction queue.
|
||||||
|
|
|
@ -99,7 +99,10 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
// Represents that, the analysis skipped due to some conditions.
|
// Represents that, the analysis skipped due to some conditions.
|
||||||
// Example conditions are if no blocks really exists in block collection or
|
// Example conditions are if no blocks really exists in block collection or
|
||||||
// if analysis is not required on ec files with unsuitable storage policies
|
// if analysis is not required on ec files with unsuitable storage policies
|
||||||
BLOCKS_TARGET_PAIRING_SKIPPED;
|
BLOCKS_TARGET_PAIRING_SKIPPED,
|
||||||
|
// Represents that, All the reported blocks are satisfied the policy but
|
||||||
|
// some of the blocks are low redundant.
|
||||||
|
FEW_LOW_REDUNDANCY_BLOCKS
|
||||||
}
|
}
|
||||||
|
|
||||||
public StoragePolicySatisfier(final Namesystem namesystem,
|
public StoragePolicySatisfier(final Namesystem namesystem,
|
||||||
|
@ -247,6 +250,14 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
case FEW_BLOCKS_TARGETS_PAIRED:
|
case FEW_BLOCKS_TARGETS_PAIRED:
|
||||||
this.storageMovementsMonitor.add(blockCollectionID, false);
|
this.storageMovementsMonitor.add(blockCollectionID, false);
|
||||||
break;
|
break;
|
||||||
|
case FEW_LOW_REDUNDANCY_BLOCKS:
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Adding trackID " + blockCollectionID
|
||||||
|
+ " back to retry queue as some of the blocks"
|
||||||
|
+ " are low redundant.");
|
||||||
|
}
|
||||||
|
this.storageMovementNeeded.add(blockCollectionID);
|
||||||
|
break;
|
||||||
// Just clean Xattrs
|
// Just clean Xattrs
|
||||||
case BLOCKS_TARGET_PAIRING_SKIPPED:
|
case BLOCKS_TARGET_PAIRING_SKIPPED:
|
||||||
case BLOCKS_ALREADY_SATISFIED:
|
case BLOCKS_ALREADY_SATISFIED:
|
||||||
|
@ -347,11 +358,16 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
boolean computeStatus = computeBlockMovingInfos(blockMovingInfos,
|
boolean computeStatus = computeBlockMovingInfos(blockMovingInfos,
|
||||||
blockInfo, expectedStorageTypes, existing, storages);
|
blockInfo, expectedStorageTypes, existing, storages);
|
||||||
if (computeStatus
|
if (computeStatus
|
||||||
&& status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED) {
|
&& status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED
|
||||||
|
&& !blockManager.hasLowRedundancyBlocks(blockCollection)) {
|
||||||
status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED;
|
status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED;
|
||||||
} else {
|
} else {
|
||||||
status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED;
|
status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
|
||||||
|
status = BlocksMovingAnalysisStatus.FEW_LOW_REDUNDANCY_BLOCKS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
|
@ -29,6 +30,7 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.ReconfigurationException;
|
import org.apache.hadoop.conf.ReconfigurationException;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||||
|
@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -582,7 +586,9 @@ public class TestStoragePolicySatisfier {
|
||||||
Assert.assertTrue("SPS should be running as "
|
Assert.assertTrue("SPS should be running as "
|
||||||
+ "no Mover really running", running);
|
+ "no Mover really running", running);
|
||||||
} finally {
|
} finally {
|
||||||
hdfsCluster.shutdown();
|
if (hdfsCluster != null) {
|
||||||
|
hdfsCluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -983,6 +989,100 @@ public class TestStoragePolicySatisfier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test SPS for low redundant file blocks.
|
||||||
|
* 1. Create cluster with 3 datanode.
|
||||||
|
* 1. Create one file with 3 replica.
|
||||||
|
* 2. Set policy and call satisfyStoragePolicy for file.
|
||||||
|
* 3. Stop NameNode and Datanodes.
|
||||||
|
* 4. Start NameNode with 2 datanode and wait for block movement.
|
||||||
|
* 5. Start third datanode.
|
||||||
|
* 6. Third Datanode replica also should be moved in proper
|
||||||
|
* sorage based on policy.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(DFSConfigKeys
|
||||||
|
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
||||||
|
"3000");
|
||||||
|
StorageType[][] newtypes = new StorageType[][] {
|
||||||
|
{StorageType.ARCHIVE, StorageType.DISK},
|
||||||
|
{StorageType.ARCHIVE, StorageType.DISK},
|
||||||
|
{StorageType.ARCHIVE, StorageType.DISK}};
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
||||||
|
.storageTypes(newtypes).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
Path filePath = new Path("/zeroSizeFile");
|
||||||
|
DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0);
|
||||||
|
fs.setStoragePolicy(filePath, "COLD");
|
||||||
|
List<DataNodeProperties> list = new ArrayList<>();
|
||||||
|
list.add(cluster.stopDataNode(0));
|
||||||
|
list.add(cluster.stopDataNode(0));
|
||||||
|
list.add(cluster.stopDataNode(0));
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
cluster.restartDataNode(list.get(0), true);
|
||||||
|
cluster.restartDataNode(list.get(1), true);
|
||||||
|
cluster.waitActive();
|
||||||
|
fs.satisfyStoragePolicy(filePath);
|
||||||
|
Thread.sleep(3000 * 6);
|
||||||
|
cluster.restartDataNode(list.get(2), true);
|
||||||
|
DFSTestUtil.waitExpectedStorageType(filePath.toString(),
|
||||||
|
StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test SPS for extra redundant file blocks.
|
||||||
|
* 1. Create cluster with 5 datanode.
|
||||||
|
* 2. Create one file with 5 replica.
|
||||||
|
* 3. Set file replication to 3.
|
||||||
|
* 4. Set policy and call satisfyStoragePolicy for file.
|
||||||
|
* 5. Block should be moved successfully.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testSPSWhenFileHasExcessRedundancyBlocks() throws Exception {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(DFSConfigKeys
|
||||||
|
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
||||||
|
"3000");
|
||||||
|
StorageType[][] newtypes = new StorageType[][] {
|
||||||
|
{StorageType.ARCHIVE, StorageType.DISK},
|
||||||
|
{StorageType.ARCHIVE, StorageType.DISK},
|
||||||
|
{StorageType.ARCHIVE, StorageType.DISK},
|
||||||
|
{StorageType.ARCHIVE, StorageType.DISK},
|
||||||
|
{StorageType.ARCHIVE, StorageType.DISK}};
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5)
|
||||||
|
.storageTypes(newtypes).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
Path filePath = new Path("/zeroSizeFile");
|
||||||
|
DFSTestUtil.createFile(fs, filePath, 1024, (short) 5, 0);
|
||||||
|
fs.setReplication(filePath, (short) 3);
|
||||||
|
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
|
||||||
|
LogFactory.getLog(BlockStorageMovementAttemptedItems.class));
|
||||||
|
fs.setStoragePolicy(filePath, "COLD");
|
||||||
|
fs.satisfyStoragePolicy(filePath);
|
||||||
|
DFSTestUtil.waitExpectedStorageType(filePath.toString(),
|
||||||
|
StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
|
||||||
|
assertFalse("Log output does not contain expected log message: ",
|
||||||
|
logs.getOutput().contains("some of the blocks are low redundant"));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
|
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
|
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -27,8 +29,10 @@ import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||||
|
@ -292,6 +296,92 @@ public class TestStoragePolicySatisfierWithStripedFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test SPS for low redundant file blocks.
|
||||||
|
* 1. Create cluster with 10 datanode.
|
||||||
|
* 1. Create one striped file with default EC Policy.
|
||||||
|
* 2. Set policy and call satisfyStoragePolicy for file.
|
||||||
|
* 3. Stop NameNode and Datanodes.
|
||||||
|
* 4. Start NameNode with 5 datanode and wait for block movement.
|
||||||
|
* 5. Start remaining 5 datanode.
|
||||||
|
* 6. All replica should be moved in proper storage based on policy.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
|
||||||
|
// start 10 datanodes
|
||||||
|
int numOfDatanodes = 10;
|
||||||
|
int storagesPerDatanode = 2;
|
||||||
|
long capacity = 20 * defaultStripeBlockSize;
|
||||||
|
long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
|
||||||
|
for (int i = 0; i < numOfDatanodes; i++) {
|
||||||
|
for (int j = 0; j < storagesPerDatanode; j++) {
|
||||||
|
capacities[i][j] = capacity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys
|
||||||
|
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
||||||
|
"3000");
|
||||||
|
initConfWithStripe(conf, defaultStripeBlockSize);
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(numOfDatanodes)
|
||||||
|
.storagesPerDatanode(storagesPerDatanode)
|
||||||
|
.storageTypes(new StorageType[][]{
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE}})
|
||||||
|
.storageCapacities(capacities)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
Path barDir = new Path("/bar");
|
||||||
|
fs.mkdirs(barDir);
|
||||||
|
// set an EC policy on "/bar" directory
|
||||||
|
fs.setErasureCodingPolicy(barDir, null);
|
||||||
|
|
||||||
|
// write file to barDir
|
||||||
|
final Path fooFile = new Path("/bar/foo");
|
||||||
|
long fileLen = cellSize * dataBlocks;
|
||||||
|
DFSTestUtil.createFile(cluster.getFileSystem(), fooFile,
|
||||||
|
fileLen, (short) 3, 0);
|
||||||
|
|
||||||
|
// Move file to ARCHIVE.
|
||||||
|
fs.setStoragePolicy(barDir, "COLD");
|
||||||
|
//Stop DataNodes and restart namenode
|
||||||
|
List<DataNodeProperties> list = new ArrayList<>(numOfDatanodes);
|
||||||
|
for (int i = 0; i < numOfDatanodes; i++) {
|
||||||
|
list.add(cluster.stopDataNode(0));
|
||||||
|
}
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
// Restart half datanodes
|
||||||
|
for (int i = 0; i < numOfDatanodes / 2; i++) {
|
||||||
|
cluster.restartDataNode(list.get(i), true);
|
||||||
|
}
|
||||||
|
cluster.waitActive();
|
||||||
|
fs.satisfyStoragePolicy(fooFile);
|
||||||
|
Thread.sleep(3000 * 6);
|
||||||
|
//Start reaming datanodes
|
||||||
|
for (int i = numOfDatanodes - 1; i > numOfDatanodes / 2; i--) {
|
||||||
|
cluster.restartDataNode(list.get(i), true);
|
||||||
|
}
|
||||||
|
// verify storage types and locations.
|
||||||
|
waitExpectedStorageType(cluster, fooFile.toString(), fileLen,
|
||||||
|
StorageType.ARCHIVE, 9, 9, 60000);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests to verify that for the given path, no blocks under the given path
|
* Tests to verify that for the given path, no blocks under the given path
|
||||||
* will be scheduled for block movement as there are no available datanode
|
* will be scheduled for block movement as there are no available datanode
|
||||||
|
|
Loading…
Reference in New Issue