HDFS-16484. [SPS]: Fix an infinite loop bug in SPSPathIdProcessor thread (#4032)
(cherry picked from commit 45394433a1
)
This commit is contained in:
parent
44e662272f
commit
07dface36a
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.sps;
|
package org.apache.hadoop.hdfs.server.namenode.sps;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -227,15 +228,18 @@ public class BlockStorageMovementNeeded {
|
||||||
* ID's to process for satisfy the policy.
|
* ID's to process for satisfy the policy.
|
||||||
*/
|
*/
|
||||||
private class SPSPathIdProcessor implements Runnable {
|
private class SPSPathIdProcessor implements Runnable {
|
||||||
|
private static final int MAX_RETRY_COUNT = 3;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Starting SPSPathIdProcessor!.");
|
LOG.info("Starting SPSPathIdProcessor!.");
|
||||||
Long startINode = null;
|
Long startINode = null;
|
||||||
|
int retryCount = 0;
|
||||||
while (ctxt.isRunning()) {
|
while (ctxt.isRunning()) {
|
||||||
try {
|
try {
|
||||||
if (!ctxt.isInSafeMode()) {
|
if (!ctxt.isInSafeMode()) {
|
||||||
if (startINode == null) {
|
if (startINode == null) {
|
||||||
|
retryCount = 0;
|
||||||
startINode = ctxt.getNextSPSPath();
|
startINode = ctxt.getNextSPSPath();
|
||||||
} // else same id will be retried
|
} // else same id will be retried
|
||||||
if (startINode == null) {
|
if (startINode == null) {
|
||||||
|
@ -248,7 +252,12 @@ public class BlockStorageMovementNeeded {
|
||||||
pendingWorkForDirectory.get(startINode);
|
pendingWorkForDirectory.get(startINode);
|
||||||
if (dirPendingWorkInfo != null
|
if (dirPendingWorkInfo != null
|
||||||
&& dirPendingWorkInfo.isDirWorkDone()) {
|
&& dirPendingWorkInfo.isDirWorkDone()) {
|
||||||
ctxt.removeSPSHint(startINode);
|
try {
|
||||||
|
ctxt.removeSPSHint(startINode);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
// ignore if the file doesn't already exist
|
||||||
|
startINode = null;
|
||||||
|
}
|
||||||
pendingWorkForDirectory.remove(startINode);
|
pendingWorkForDirectory.remove(startINode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -268,6 +277,11 @@ public class BlockStorageMovementNeeded {
|
||||||
LOG.info("Interrupted while waiting in SPSPathIdProcessor", t);
|
LOG.info("Interrupted while waiting in SPSPathIdProcessor", t);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
retryCount++;
|
||||||
|
if (retryCount >= MAX_RETRY_COUNT) {
|
||||||
|
LOG.warn("Skipping this inode {} due to too many retries.", startINode);
|
||||||
|
startINode = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -189,6 +189,20 @@ public class TestExternalStoragePolicySatisfier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void stopExternalSps() {
|
||||||
|
if (externalSps != null) {
|
||||||
|
externalSps.stopGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startExternalSps() {
|
||||||
|
externalSps = new StoragePolicySatisfier(getConf());
|
||||||
|
externalCtxt = new ExternalSPSContext(externalSps, nnc);
|
||||||
|
|
||||||
|
externalSps.init(externalCtxt);
|
||||||
|
externalSps.start(StoragePolicySatisfierMode.EXTERNAL);
|
||||||
|
}
|
||||||
|
|
||||||
private void createCluster() throws IOException {
|
private void createCluster() throws IOException {
|
||||||
getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
||||||
setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
|
setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
|
||||||
|
@ -1343,6 +1357,45 @@ public class TestExternalStoragePolicySatisfier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test SPS that satisfy the files and then delete the files before start SPS.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testSPSSatisfyAndThenDeleteFileBeforeStartSPS() throws Exception {
|
||||||
|
try {
|
||||||
|
createCluster();
|
||||||
|
HdfsAdmin hdfsAdmin =
|
||||||
|
new HdfsAdmin(FileSystem.getDefaultUri(config), config);
|
||||||
|
|
||||||
|
StorageType[][] newtypes =
|
||||||
|
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE}};
|
||||||
|
startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
|
||||||
|
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
|
||||||
|
|
||||||
|
stopExternalSps();
|
||||||
|
|
||||||
|
dfs.setStoragePolicy(new Path(FILE), COLD);
|
||||||
|
hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
|
||||||
|
dfs.delete(new Path(FILE), true);
|
||||||
|
|
||||||
|
startExternalSps();
|
||||||
|
|
||||||
|
String file1 = "/testMoveToSatisfyStoragePolicy_1";
|
||||||
|
writeContent(file1);
|
||||||
|
dfs.setStoragePolicy(new Path(file1), COLD);
|
||||||
|
hdfsAdmin.satisfyStoragePolicy(new Path(file1));
|
||||||
|
|
||||||
|
hdfsCluster.triggerHeartbeats();
|
||||||
|
DFSTestUtil.waitExpectedStorageType(file1, StorageType.ARCHIVE, 3, 30000,
|
||||||
|
dfs);
|
||||||
|
} finally {
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test SPS for directory which has multilevel directories.
|
* Test SPS for directory which has multilevel directories.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue