HDFS-11264: [SPS]: Double checks to ensure that SPS/Mover are not running together. Contributed by Rakesh R.
This commit is contained in:
parent
0b360b16ab
commit
5eb24ef7e7
@ -128,6 +128,14 @@ public StoragePolicySatisfier(final Namesystem namesystem,
|
|||||||
*/
|
*/
|
||||||
public synchronized void start(boolean reconfigStart) {
|
public synchronized void start(boolean reconfigStart) {
|
||||||
isRunning = true;
|
isRunning = true;
|
||||||
|
if (checkIfMoverRunning()) {
|
||||||
|
isRunning = false;
|
||||||
|
LOG.error(
|
||||||
|
"Stopping StoragePolicySatisfier thread " + "as Mover ID file "
|
||||||
|
+ HdfsServerConstants.MOVER_ID_PATH.toString()
|
||||||
|
+ " been opened. Maybe a Mover instance is running!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (reconfigStart) {
|
if (reconfigStart) {
|
||||||
LOG.info("Starting StoragePolicySatisfier, as admin requested to "
|
LOG.info("Starting StoragePolicySatisfier, as admin requested to "
|
||||||
+ "activate it.");
|
+ "activate it.");
|
||||||
@ -211,20 +219,6 @@ private void addDropSPSWorkCommandsToAllDNs() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
boolean isMoverRunning = !checkIfMoverRunning();
|
|
||||||
synchronized (this) {
|
|
||||||
isRunning = isMoverRunning;
|
|
||||||
if (!isRunning) {
|
|
||||||
// Stopping monitor thread and clearing queues as well
|
|
||||||
this.clearQueues();
|
|
||||||
this.storageMovementsMonitor.stopGracefully();
|
|
||||||
LOG.error(
|
|
||||||
"Stopping StoragePolicySatisfier thread " + "as Mover ID file "
|
|
||||||
+ HdfsServerConstants.MOVER_ID_PATH.toString()
|
|
||||||
+ " been opened. Maybe a Mover instance is running!");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
while (namesystem.isRunning() && isRunning) {
|
while (namesystem.isRunning() && isRunning) {
|
||||||
try {
|
try {
|
||||||
if (!namesystem.isInSafeMode()) {
|
if (!namesystem.isInSafeMode()) {
|
||||||
@ -274,25 +268,34 @@ public void run() {
|
|||||||
// we want to check block movements.
|
// we want to check block movements.
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
synchronized (this) {
|
handleException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleException(Throwable t) {
|
||||||
|
// double check to avoid entering into synchronized block.
|
||||||
|
if (isRunning) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (isRunning) {
|
||||||
isRunning = false;
|
isRunning = false;
|
||||||
// Stopping monitor thread and clearing queues as well
|
// Stopping monitor thread and clearing queues as well
|
||||||
this.clearQueues();
|
this.clearQueues();
|
||||||
this.storageMovementsMonitor.stopGracefully();
|
this.storageMovementsMonitor.stopGracefully();
|
||||||
}
|
if (!namesystem.isRunning()) {
|
||||||
if (!namesystem.isRunning()) {
|
LOG.info("Stopping StoragePolicySatisfier.");
|
||||||
LOG.info("Stopping StoragePolicySatisfier.");
|
if (!(t instanceof InterruptedException)) {
|
||||||
if (!(t instanceof InterruptedException)) {
|
LOG.info("StoragePolicySatisfier received an exception"
|
||||||
LOG.info("StoragePolicySatisfier received an exception"
|
+ " while shutting down.", t);
|
||||||
+ " while shutting down.", t);
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
LOG.error("StoragePolicySatisfier thread received runtime exception. "
|
|
||||||
+ "Stopping Storage policy satisfier work", t);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.error("StoragePolicySatisfier thread received runtime exception. "
|
||||||
|
+ "Stopping Storage policy satisfier work", t);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
|
private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
|
||||||
|
@ -927,7 +927,8 @@ public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles()
|
|||||||
String fooDir = "/foo";
|
String fooDir = "/foo";
|
||||||
client.mkdirs(fooDir, new FsPermission((short) 777), true);
|
client.mkdirs(fooDir, new FsPermission((short) 777), true);
|
||||||
// set an EC policy on "/foo" directory
|
// set an EC policy on "/foo" directory
|
||||||
client.setErasureCodingPolicy(fooDir, null);
|
client.setErasureCodingPolicy(fooDir,
|
||||||
|
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||||
|
|
||||||
// write file to fooDir
|
// write file to fooDir
|
||||||
final String testFile = "/foo/bar";
|
final String testFile = "/foo/bar";
|
||||||
|
@ -323,6 +323,8 @@ public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
|
|||||||
conf.set(DFSConfigKeys
|
conf.set(DFSConfigKeys
|
||||||
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
||||||
"3000");
|
"3000");
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||||
|
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||||
initConfWithStripe(conf, defaultStripeBlockSize);
|
initConfWithStripe(conf, defaultStripeBlockSize);
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
.numDataNodes(numOfDatanodes)
|
.numDataNodes(numOfDatanodes)
|
||||||
@ -346,7 +348,8 @@ public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
|
|||||||
Path barDir = new Path("/bar");
|
Path barDir = new Path("/bar");
|
||||||
fs.mkdirs(barDir);
|
fs.mkdirs(barDir);
|
||||||
// set an EC policy on "/bar" directory
|
// set an EC policy on "/bar" directory
|
||||||
fs.setErasureCodingPolicy(barDir, null);
|
fs.setErasureCodingPolicy(barDir,
|
||||||
|
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||||
|
|
||||||
// write file to barDir
|
// write file to barDir
|
||||||
final Path fooFile = new Path("/bar/foo");
|
final Path fooFile = new Path("/bar/foo");
|
||||||
|
Loading…
x
Reference in New Issue
Block a user