HDFS-8540. Mover should exit with NO_MOVE_BLOCK if no block can be moved. Contributed by surendra singh lilhore
This commit is contained in:
parent
5c111a5192
commit
9ba29f081b
|
@ -580,6 +580,9 @@ Release 2.7.1 - UNRELEASED
|
||||||
HDFS-8521. Add VisibleForTesting annotation to
|
HDFS-8521. Add VisibleForTesting annotation to
|
||||||
BlockPoolSlice#selectReplicaToDelete. (cmccabe)
|
BlockPoolSlice#selectReplicaToDelete. (cmccabe)
|
||||||
|
|
||||||
|
HDFS-8540. Mover should exit with NO_MOVE_BLOCK if no block can be moved.
|
||||||
|
(surendra singh lilhore via szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
@ -163,8 +162,7 @@ public class Mover {
|
||||||
private ExitStatus run() {
|
private ExitStatus run() {
|
||||||
try {
|
try {
|
||||||
init();
|
init();
|
||||||
boolean hasRemaining = new Processor().processNamespace();
|
return new Processor().processNamespace().getExitStatus();
|
||||||
return hasRemaining ? ExitStatus.IN_PROGRESS : ExitStatus.SUCCESS;
|
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
return ExitStatus.ILLEGAL_ARGUMENTS;
|
return ExitStatus.ILLEGAL_ARGUMENTS;
|
||||||
|
@ -262,11 +260,11 @@ public class Mover {
|
||||||
* @return whether there is still remaining migration work for the next
|
* @return whether there is still remaining migration work for the next
|
||||||
* round
|
* round
|
||||||
*/
|
*/
|
||||||
private boolean processNamespace() throws IOException {
|
private Result processNamespace() throws IOException {
|
||||||
getSnapshottableDirs();
|
getSnapshottableDirs();
|
||||||
boolean hasRemaining = false;
|
Result result = new Result();
|
||||||
for (Path target : targetPaths) {
|
for (Path target : targetPaths) {
|
||||||
hasRemaining |= processPath(target.toUri().getPath());
|
processPath(target.toUri().getPath(), result);
|
||||||
}
|
}
|
||||||
// wait for pending move to finish and retry the failed migration
|
// wait for pending move to finish and retry the failed migration
|
||||||
boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
|
boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
|
||||||
|
@ -282,16 +280,15 @@ public class Mover {
|
||||||
// Reset retry count if no failure.
|
// Reset retry count if no failure.
|
||||||
retryCount.set(0);
|
retryCount.set(0);
|
||||||
}
|
}
|
||||||
hasRemaining |= hasFailed;
|
result.updateHasRemaining(hasFailed);
|
||||||
return hasRemaining;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return whether there is still remaing migration work for the next
|
* @return whether there is still remaing migration work for the next
|
||||||
* round
|
* round
|
||||||
*/
|
*/
|
||||||
private boolean processPath(String fullPath) {
|
private void processPath(String fullPath, Result result) {
|
||||||
boolean hasRemaining = false;
|
|
||||||
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
|
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
|
||||||
final DirectoryListing children;
|
final DirectoryListing children;
|
||||||
try {
|
try {
|
||||||
|
@ -299,70 +296,68 @@ public class Mover {
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
LOG.warn("Failed to list directory " + fullPath
|
LOG.warn("Failed to list directory " + fullPath
|
||||||
+ ". Ignore the directory and continue.", e);
|
+ ". Ignore the directory and continue.", e);
|
||||||
return hasRemaining;
|
return;
|
||||||
}
|
}
|
||||||
if (children == null) {
|
if (children == null) {
|
||||||
return hasRemaining;
|
return;
|
||||||
}
|
}
|
||||||
for (HdfsFileStatus child : children.getPartialListing()) {
|
for (HdfsFileStatus child : children.getPartialListing()) {
|
||||||
hasRemaining |= processRecursively(fullPath, child);
|
processRecursively(fullPath, child, result);
|
||||||
}
|
}
|
||||||
if (children.hasMore()) {
|
if (children.hasMore()) {
|
||||||
lastReturnedName = children.getLastName();
|
lastReturnedName = children.getLastName();
|
||||||
} else {
|
} else {
|
||||||
return hasRemaining;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return whether the migration requires next round */
|
/** @return whether the migration requires next round */
|
||||||
private boolean processRecursively(String parent, HdfsFileStatus status) {
|
private void processRecursively(String parent, HdfsFileStatus status,
|
||||||
|
Result result) {
|
||||||
String fullPath = status.getFullName(parent);
|
String fullPath = status.getFullName(parent);
|
||||||
boolean hasRemaining = false;
|
|
||||||
if (status.isDir()) {
|
if (status.isDir()) {
|
||||||
if (!fullPath.endsWith(Path.SEPARATOR)) {
|
if (!fullPath.endsWith(Path.SEPARATOR)) {
|
||||||
fullPath = fullPath + Path.SEPARATOR;
|
fullPath = fullPath + Path.SEPARATOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
hasRemaining = processPath(fullPath);
|
processPath(fullPath, result);
|
||||||
// process snapshots if this is a snapshottable directory
|
// process snapshots if this is a snapshottable directory
|
||||||
if (snapshottableDirs.contains(fullPath)) {
|
if (snapshottableDirs.contains(fullPath)) {
|
||||||
final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
|
final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
|
||||||
hasRemaining |= processPath(dirSnapshot);
|
processPath(dirSnapshot, result);
|
||||||
}
|
}
|
||||||
} else if (!status.isSymlink()) { // file
|
} else if (!status.isSymlink()) { // file
|
||||||
try {
|
try {
|
||||||
if (!isSnapshotPathInCurrent(fullPath)) {
|
if (!isSnapshotPathInCurrent(fullPath)) {
|
||||||
// the full path is a snapshot path but it is also included in the
|
// the full path is a snapshot path but it is also included in the
|
||||||
// current directory tree, thus ignore it.
|
// current directory tree, thus ignore it.
|
||||||
hasRemaining = processFile(fullPath, (HdfsLocatedFileStatus)status);
|
processFile(fullPath, (HdfsLocatedFileStatus) status, result);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to check the status of " + parent
|
LOG.warn("Failed to check the status of " + parent
|
||||||
+ ". Ignore it and continue.", e);
|
+ ". Ignore it and continue.", e);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return hasRemaining;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return true if it is necessary to run another round of migration */
|
/** @return true if it is necessary to run another round of migration */
|
||||||
private boolean processFile(String fullPath, HdfsLocatedFileStatus status) {
|
private void processFile(String fullPath, HdfsLocatedFileStatus status,
|
||||||
|
Result result) {
|
||||||
final byte policyId = status.getStoragePolicy();
|
final byte policyId = status.getStoragePolicy();
|
||||||
// currently we ignore files with unspecified storage policy
|
// currently we ignore files with unspecified storage policy
|
||||||
if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
|
if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
|
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
|
||||||
if (policy == null) {
|
if (policy == null) {
|
||||||
LOG.warn("Failed to get the storage policy of file " + fullPath);
|
LOG.warn("Failed to get the storage policy of file " + fullPath);
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
final List<StorageType> types = policy.chooseStorageTypes(
|
final List<StorageType> types = policy.chooseStorageTypes(
|
||||||
status.getReplication());
|
status.getReplication());
|
||||||
|
|
||||||
final LocatedBlocks locatedBlocks = status.getBlockLocations();
|
final LocatedBlocks locatedBlocks = status.getBlockLocations();
|
||||||
boolean hasRemaining = false;
|
|
||||||
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
|
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
|
||||||
List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
|
List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
|
||||||
for (int i = 0; i < lbs.size(); i++) {
|
for (int i = 0; i < lbs.size(); i++) {
|
||||||
|
@ -375,12 +370,15 @@ public class Mover {
|
||||||
lb.getStorageTypes());
|
lb.getStorageTypes());
|
||||||
if (!diff.removeOverlap(true)) {
|
if (!diff.removeOverlap(true)) {
|
||||||
if (scheduleMoves4Block(diff, lb)) {
|
if (scheduleMoves4Block(diff, lb)) {
|
||||||
hasRemaining |= (diff.existing.size() > 1 &&
|
result.updateHasRemaining(diff.existing.size() > 1
|
||||||
diff.expected.size() > 1);
|
&& diff.expected.size() > 1);
|
||||||
|
// One block scheduled successfully, set noBlockMoved to false
|
||||||
|
result.setNoBlockMoved(false);
|
||||||
|
} else {
|
||||||
|
result.updateHasRemaining(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return hasRemaining;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
|
boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
|
||||||
|
@ -711,6 +709,45 @@ public class Mover {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class Result {
|
||||||
|
|
||||||
|
private boolean hasRemaining;
|
||||||
|
private boolean noBlockMoved;
|
||||||
|
|
||||||
|
Result() {
|
||||||
|
hasRemaining = false;
|
||||||
|
noBlockMoved = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isHasRemaining() {
|
||||||
|
return hasRemaining;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isNoBlockMoved() {
|
||||||
|
return noBlockMoved;
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateHasRemaining(boolean hasRemaining) {
|
||||||
|
this.hasRemaining |= hasRemaining;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setNoBlockMoved(boolean noBlockMoved) {
|
||||||
|
this.noBlockMoved = noBlockMoved;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return SUCCESS if all moves are success and there is no remaining move.
|
||||||
|
* Return NO_MOVE_BLOCK if there moves available but all the moves
|
||||||
|
* cannot be scheduled. Otherwise, return IN_PROGRESS since there
|
||||||
|
* must be some remaining moves.
|
||||||
|
*/
|
||||||
|
ExitStatus getExitStatus() {
|
||||||
|
return !isHasRemaining() ? ExitStatus.SUCCESS
|
||||||
|
: isNoBlockMoved() ? ExitStatus.NO_MOVE_BLOCK
|
||||||
|
: ExitStatus.IN_PROGRESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Run a Mover in command line.
|
* Run a Mover in command line.
|
||||||
*
|
*
|
||||||
|
|
|
@ -328,6 +328,35 @@ public class TestMover {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testMoveWhenStoragePolicyNotSatisfying() throws Exception {
|
||||||
|
// HDFS-8147
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(3)
|
||||||
|
.storageTypes(
|
||||||
|
new StorageType[][] { { StorageType.DISK }, { StorageType.DISK },
|
||||||
|
{ StorageType.DISK } }).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
final String file = "/testMoveWhenStoragePolicyNotSatisfying";
|
||||||
|
// write to DISK
|
||||||
|
final FSDataOutputStream out = dfs.create(new Path(file));
|
||||||
|
out.writeChars("testMoveWhenStoragePolicyNotSatisfying");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// move to ARCHIVE
|
||||||
|
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||||
|
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||||
|
new String[] { "-p", file.toString() });
|
||||||
|
int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode();
|
||||||
|
Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMoverFailedRetry() throws Exception {
|
public void testMoverFailedRetry() throws Exception {
|
||||||
// HDFS-8147
|
// HDFS-8147
|
||||||
|
|
|
@ -222,7 +222,7 @@ public class TestStorageMover {
|
||||||
verify(true);
|
verify(true);
|
||||||
|
|
||||||
setStoragePolicy();
|
setStoragePolicy();
|
||||||
migrate();
|
migrate(ExitStatus.SUCCESS);
|
||||||
verify(true);
|
verify(true);
|
||||||
} finally {
|
} finally {
|
||||||
if (shutdown) {
|
if (shutdown) {
|
||||||
|
@ -253,8 +253,8 @@ public class TestStorageMover {
|
||||||
/**
|
/**
|
||||||
* Run the migration tool.
|
* Run the migration tool.
|
||||||
*/
|
*/
|
||||||
void migrate() throws Exception {
|
void migrate(ExitStatus expectedExitCode) throws Exception {
|
||||||
runMover();
|
runMover(expectedExitCode);
|
||||||
Thread.sleep(5000); // let the NN finish deletion
|
Thread.sleep(5000); // let the NN finish deletion
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,14 +270,14 @@ public class TestStorageMover {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runMover() throws Exception {
|
private void runMover(ExitStatus expectedExitCode) throws Exception {
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||||
Map<URI, List<Path>> nnMap = Maps.newHashMap();
|
Map<URI, List<Path>> nnMap = Maps.newHashMap();
|
||||||
for (URI nn : namenodes) {
|
for (URI nn : namenodes) {
|
||||||
nnMap.put(nn, null);
|
nnMap.put(nn, null);
|
||||||
}
|
}
|
||||||
int result = Mover.run(nnMap, conf);
|
int result = Mover.run(nnMap, conf);
|
||||||
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
|
Assert.assertEquals(expectedExitCode.getExitCode(), result);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyNamespace() throws Exception {
|
private void verifyNamespace() throws Exception {
|
||||||
|
@ -583,7 +583,7 @@ public class TestStorageMover {
|
||||||
try {
|
try {
|
||||||
banner("start data migration");
|
banner("start data migration");
|
||||||
test.setStoragePolicy(); // set /foo to COLD
|
test.setStoragePolicy(); // set /foo to COLD
|
||||||
test.migrate();
|
test.migrate(ExitStatus.SUCCESS);
|
||||||
|
|
||||||
// make sure the under construction block has not been migrated
|
// make sure the under construction block has not been migrated
|
||||||
LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks(
|
LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks(
|
||||||
|
@ -633,7 +633,7 @@ public class TestStorageMover {
|
||||||
try {
|
try {
|
||||||
test.runBasicTest(false);
|
test.runBasicTest(false);
|
||||||
pathPolicyMap.moveAround(test.dfs);
|
pathPolicyMap.moveAround(test.dfs);
|
||||||
test.migrate();
|
test.migrate(ExitStatus.SUCCESS);
|
||||||
|
|
||||||
test.verify(true);
|
test.verify(true);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -723,7 +723,7 @@ public class TestStorageMover {
|
||||||
//test move a hot file to warm
|
//test move a hot file to warm
|
||||||
final Path file1 = new Path(pathPolicyMap.hot, "file1");
|
final Path file1 = new Path(pathPolicyMap.hot, "file1");
|
||||||
test.dfs.rename(file1, pathPolicyMap.warm);
|
test.dfs.rename(file1, pathPolicyMap.warm);
|
||||||
test.migrate();
|
test.migrate(ExitStatus.NO_MOVE_BLOCK);
|
||||||
test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
|
test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
|
||||||
} finally {
|
} finally {
|
||||||
test.shutdownCluster();
|
test.shutdownCluster();
|
||||||
|
@ -781,7 +781,7 @@ public class TestStorageMover {
|
||||||
{ //test move a cold file to warm
|
{ //test move a cold file to warm
|
||||||
final Path file1 = new Path(pathPolicyMap.cold, "file1");
|
final Path file1 = new Path(pathPolicyMap.cold, "file1");
|
||||||
test.dfs.rename(file1, pathPolicyMap.warm);
|
test.dfs.rename(file1, pathPolicyMap.warm);
|
||||||
test.migrate();
|
test.migrate(ExitStatus.SUCCESS);
|
||||||
test.verify(true);
|
test.verify(true);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue