HDFS-11239: [SPS]: Check Mover file ID lease also to determine whether Mover is running. Contributed by Wei Zhou
This commit is contained in:
parent
e34331c31d
commit
9b15f5418d
|
@ -3564,7 +3564,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
BlockInfo getStoredBlock(Block block) {
|
BlockInfo getStoredBlock(Block block) {
|
||||||
return blockManager.getStoredBlock(block);
|
return blockManager.getStoredBlock(block);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFileOpenedForWrite(String path) {
|
||||||
|
readLock();
|
||||||
|
try {
|
||||||
|
INode inode = dir.getINode(path, FSDirectory.DirOp.READ);
|
||||||
|
INodeFile iNodeFile = INodeFile.valueOf(inode, path);
|
||||||
|
LeaseManager.Lease lease = leaseManager.getLease(iNodeFile);
|
||||||
|
return lease != null;
|
||||||
|
} catch (IOException e) {
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isInSnapshot(long blockCollectionID) {
|
public boolean isInSnapshot(long blockCollectionID) {
|
||||||
assert hasReadLock();
|
assert hasReadLock();
|
||||||
|
|
|
@ -45,4 +45,11 @@ public interface Namesystem extends RwLock, SafeMode {
|
||||||
* middle of the starting active services.
|
* middle of the starting active services.
|
||||||
*/
|
*/
|
||||||
boolean inTransitionToActive();
|
boolean inTransitionToActive();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if file is been opened for write purpose.
|
||||||
|
* @param filePath
|
||||||
|
* @return true if valid write lease exists, otherwise return false.
|
||||||
|
*/
|
||||||
|
boolean isFileOpenedForWrite(String filePath);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -151,19 +150,8 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
|
|
||||||
// Return true if a Mover instance is running
|
// Return true if a Mover instance is running
|
||||||
private boolean checkIfMoverRunning() {
|
private boolean checkIfMoverRunning() {
|
||||||
boolean ret = false;
|
String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
|
||||||
try {
|
return namesystem.isFileOpenedForWrite(moverId);
|
||||||
String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
|
|
||||||
INode inode = namesystem.getFSDirectory().getINode(
|
|
||||||
moverId, FSDirectory.DirOp.READ);
|
|
||||||
if (inode != null) {
|
|
||||||
ret = true;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.info("StoragePolicySatisfier is enabled as no Mover ID file found.");
|
|
||||||
ret = false;
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -177,7 +165,8 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
this.storageMovementsMonitor.stop();
|
this.storageMovementsMonitor.stop();
|
||||||
LOG.error(
|
LOG.error(
|
||||||
"Stopping StoragePolicySatisfier thread " + "as Mover ID file "
|
"Stopping StoragePolicySatisfier thread " + "as Mover ID file "
|
||||||
+ HdfsServerConstants.MOVER_ID_PATH.toString() + " exists");
|
+ HdfsServerConstants.MOVER_ID_PATH.toString()
|
||||||
|
+ " been opened. Maybe a Mover instance is running!");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.ReconfigurationException;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -97,29 +98,33 @@ public class TestStoragePolicySatisfier {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
createCluster();
|
createCluster();
|
||||||
// Change policy to COLD
|
doTestWhenStoragePolicySetToCOLD();
|
||||||
dfs.setStoragePolicy(new Path(file), COLD);
|
|
||||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
|
||||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
|
||||||
|
|
||||||
StorageType[][] newtypes =
|
|
||||||
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
|
|
||||||
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
|
|
||||||
storagesPerDatanode, capacity, hdfsCluster);
|
|
||||||
|
|
||||||
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
|
|
||||||
|
|
||||||
hdfsCluster.triggerHeartbeats();
|
|
||||||
// Wait till namenode notified about the block location details
|
|
||||||
DFSTestUtil.waitExpectedStorageType(
|
|
||||||
file, StorageType.ARCHIVE, 3, 30000, dfs);
|
|
||||||
} finally {
|
} finally {
|
||||||
shutdownCluster();
|
shutdownCluster();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void doTestWhenStoragePolicySetToCOLD() throws Exception {
|
||||||
|
// Change policy to COLD
|
||||||
|
dfs.setStoragePolicy(new Path(file), COLD);
|
||||||
|
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||||
|
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||||
|
|
||||||
|
StorageType[][] newtypes =
|
||||||
|
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||||
|
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||||
|
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
|
||||||
|
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
|
||||||
|
storagesPerDatanode, capacity, hdfsCluster);
|
||||||
|
|
||||||
|
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
|
||||||
|
|
||||||
|
hdfsCluster.triggerHeartbeats();
|
||||||
|
// Wait till namenode notified about the block location details
|
||||||
|
DFSTestUtil.waitExpectedStorageType(
|
||||||
|
file, StorageType.ARCHIVE, 3, 30000, dfs);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testWhenStoragePolicySetToALLSSD()
|
public void testWhenStoragePolicySetToALLSSD()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -500,19 +505,78 @@ public class TestStoragePolicySatisfier {
|
||||||
*/
|
*/
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
|
public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
|
||||||
|
throws Exception {
|
||||||
|
boolean running;
|
||||||
|
FSDataOutputStream out = null;
|
||||||
|
try {
|
||||||
|
createCluster();
|
||||||
|
// Stop SPS
|
||||||
|
hdfsCluster.getNameNode().reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, "false");
|
||||||
|
running = hdfsCluster.getFileSystem()
|
||||||
|
.getClient().isStoragePolicySatisfierRunning();
|
||||||
|
Assert.assertFalse("SPS should stopped as configured.", running);
|
||||||
|
|
||||||
|
// Simulate the case by creating MOVER_ID file
|
||||||
|
out = hdfsCluster.getFileSystem().create(
|
||||||
|
HdfsServerConstants.MOVER_ID_PATH);
|
||||||
|
|
||||||
|
// Restart SPS
|
||||||
|
hdfsCluster.getNameNode().reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, "true");
|
||||||
|
|
||||||
|
running = hdfsCluster.getFileSystem()
|
||||||
|
.getClient().isStoragePolicySatisfierRunning();
|
||||||
|
Assert.assertFalse("SPS should not be able to run as file "
|
||||||
|
+ HdfsServerConstants.MOVER_ID_PATH + " is being hold.", running);
|
||||||
|
|
||||||
|
// Simulate Mover exists
|
||||||
|
out.close();
|
||||||
|
out = null;
|
||||||
|
hdfsCluster.getFileSystem().delete(
|
||||||
|
HdfsServerConstants.MOVER_ID_PATH, true);
|
||||||
|
|
||||||
|
// Restart SPS again
|
||||||
|
hdfsCluster.getNameNode().reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, "true");
|
||||||
|
running = hdfsCluster.getFileSystem()
|
||||||
|
.getClient().isStoragePolicySatisfierRunning();
|
||||||
|
Assert.assertTrue("SPS should be running as "
|
||||||
|
+ "Mover already exited", running);
|
||||||
|
|
||||||
|
// Check functionality after SPS restart
|
||||||
|
doTestWhenStoragePolicySetToCOLD();
|
||||||
|
} catch (ReconfigurationException e) {
|
||||||
|
throw new IOException("Exception when reconfigure "
|
||||||
|
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, e);
|
||||||
|
} finally {
|
||||||
|
if (out != null) {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
hdfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests to verify that SPS should be able to start when the Mover ID file
|
||||||
|
* is not being hold by a Mover. This can be the case when Mover exits
|
||||||
|
* ungracefully without deleting the ID file from HDFS.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testWhenMoverExitsWithoutDeleteMoverIDFile()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
createCluster();
|
createCluster();
|
||||||
// Simulate Mover by creating MOVER_ID file
|
// Simulate the case by creating MOVER_ID file
|
||||||
DFSTestUtil.createFile(hdfsCluster.getFileSystem(),
|
DFSTestUtil.createFile(hdfsCluster.getFileSystem(),
|
||||||
HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
|
HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
|
||||||
hdfsCluster.restartNameNode(true);
|
hdfsCluster.restartNameNode(true);
|
||||||
boolean running = hdfsCluster.getFileSystem()
|
boolean running = hdfsCluster.getFileSystem()
|
||||||
.getClient().isStoragePolicySatisfierRunning();
|
.getClient().isStoragePolicySatisfierRunning();
|
||||||
Assert.assertFalse("SPS should not start "
|
Assert.assertTrue("SPS should be running as "
|
||||||
+ "when a Mover instance is running", running);
|
+ "no Mover really running", running);
|
||||||
} finally {
|
} finally {
|
||||||
shutdownCluster();
|
hdfsCluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue