HDFS-14343. RBF: Fix renaming folders spread across multiple subclusters. Contributed by Ayush Saxena.
This commit is contained in:
parent
2a2d5eb441
commit
f539e2a4ee
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
||||||
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
@ -88,6 +89,8 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -466,9 +469,13 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("rename",
|
RemoteMethod method = new RemoteMethod("rename",
|
||||||
new Class<?>[] {String.class, String.class},
|
new Class<?>[] {String.class, String.class},
|
||||||
new RemoteParam(), dstParam);
|
new RemoteParam(), dstParam);
|
||||||
|
if (isMultiDestDirectory(src)) {
|
||||||
|
return rpcClient.invokeAll(locs, method);
|
||||||
|
} else {
|
||||||
return rpcClient.invokeSequential(locs, method, Boolean.class,
|
return rpcClient.invokeSequential(locs, method, Boolean.class,
|
||||||
Boolean.TRUE);
|
Boolean.TRUE);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void rename2(final String src, final String dst,
|
public void rename2(final String src, final String dst,
|
||||||
|
@ -488,8 +495,12 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
RemoteMethod method = new RemoteMethod("rename2",
|
RemoteMethod method = new RemoteMethod("rename2",
|
||||||
new Class<?>[] {String.class, String.class, options.getClass()},
|
new Class<?>[] {String.class, String.class, options.getClass()},
|
||||||
new RemoteParam(), dstParam, options);
|
new RemoteParam(), dstParam, options);
|
||||||
|
if (isMultiDestDirectory(src)) {
|
||||||
|
rpcClient.invokeConcurrent(locs, method);
|
||||||
|
} else {
|
||||||
rpcClient.invokeSequential(locs, method, null, null);
|
rpcClient.invokeSequential(locs, method, null, null);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void concat(String trg, String[] src) throws IOException {
|
public void concat(String trg, String[] src) throws IOException {
|
||||||
|
@ -1857,4 +1868,34 @@ public class RouterClientProtocol implements ClientProtocol {
|
||||||
}
|
}
|
||||||
return modTime;
|
return modTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the path is a directory and is supposed to be present in all
|
||||||
|
* subclusters.
|
||||||
|
* @param src the source path
|
||||||
|
* @return true if the path is directory and is supposed to be present in all
|
||||||
|
* subclusters else false in all other scenarios.
|
||||||
|
* @throws IOException if unable to get the file status.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean isMultiDestDirectory(String src) throws IOException {
|
||||||
|
try {
|
||||||
|
if (rpcServer.isPathAll(src)) {
|
||||||
|
List<RemoteLocation> locations;
|
||||||
|
locations = rpcServer.getLocationsForPath(src, false);
|
||||||
|
RemoteMethod method = new RemoteMethod("getFileInfo",
|
||||||
|
new Class<?>[] {String.class}, new RemoteParam());
|
||||||
|
HdfsFileStatus fileStatus = rpcClient.invokeSequential(locations,
|
||||||
|
method, HdfsFileStatus.class, null);
|
||||||
|
if (fileStatus != null) {
|
||||||
|
return fileStatus.isDirectory();
|
||||||
|
} else {
|
||||||
|
LOG.debug("The destination {} doesn't exist.", src);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (UnresolvedPathException e) {
|
||||||
|
LOG.debug("The destination {} is a symlink.", src);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import java.util.TreeSet;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
@ -427,6 +428,117 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
|
||||||
null, null, Arrays.asList("ns0", "ns1"));
|
null, null, Arrays.asList("ns0", "ns1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsMultiDestDir() throws Exception {
|
||||||
|
RouterClientProtocol client =
|
||||||
|
routerContext.getRouter().getRpcServer().getClientProtocolModule();
|
||||||
|
setupOrderMountPath(DestinationOrder.HASH_ALL);
|
||||||
|
// Should be true only for directory and false for all other cases.
|
||||||
|
assertTrue(client.isMultiDestDirectory("/mount/dir"));
|
||||||
|
assertFalse(client.isMultiDestDirectory("/mount/nodir"));
|
||||||
|
assertFalse(client.isMultiDestDirectory("/mount/dir/file"));
|
||||||
|
routerFs.createSymlink(new Path("/mount/dir/file"),
|
||||||
|
new Path("/mount/dir/link"), true);
|
||||||
|
assertFalse(client.isMultiDestDirectory("/mount/dir/link"));
|
||||||
|
routerFs.createSymlink(new Path("/mount/dir/dir"),
|
||||||
|
new Path("/mount/dir/linkDir"), true);
|
||||||
|
assertFalse(client.isMultiDestDirectory("/mount/dir/linkDir"));
|
||||||
|
resetTestEnvironment();
|
||||||
|
// Test single directory destination. Should be false for the directory.
|
||||||
|
setupOrderMountPath(DestinationOrder.HASH);
|
||||||
|
assertFalse(client.isMultiDestDirectory("/mount/dir"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRenameMultipleDestDirectories() throws Exception {
|
||||||
|
// Test renaming directories using rename API.
|
||||||
|
verifyRenameOnMultiDestDirectories(DestinationOrder.HASH_ALL, false);
|
||||||
|
resetTestEnvironment();
|
||||||
|
verifyRenameOnMultiDestDirectories(DestinationOrder.RANDOM, false);
|
||||||
|
resetTestEnvironment();
|
||||||
|
verifyRenameOnMultiDestDirectories(DestinationOrder.SPACE, false);
|
||||||
|
resetTestEnvironment();
|
||||||
|
// Test renaming directories using rename2 API.
|
||||||
|
verifyRenameOnMultiDestDirectories(DestinationOrder.HASH_ALL, true);
|
||||||
|
resetTestEnvironment();
|
||||||
|
verifyRenameOnMultiDestDirectories(DestinationOrder.RANDOM, true);
|
||||||
|
resetTestEnvironment();
|
||||||
|
verifyRenameOnMultiDestDirectories(DestinationOrder.SPACE, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify rename operation on directories in case of multiple
|
||||||
|
* destinations.
|
||||||
|
* @param order order to be followed by the mount entry.
|
||||||
|
* @param isRename2 true if the verification is to be done using rename2(..)
|
||||||
|
* method.
|
||||||
|
* @throws Exception on account of any exception during test execution.
|
||||||
|
*/
|
||||||
|
private void verifyRenameOnMultiDestDirectories(DestinationOrder order,
|
||||||
|
boolean isRename2) throws Exception {
|
||||||
|
setupOrderMountPath(order);
|
||||||
|
Path src = new Path("/mount/dir/dir");
|
||||||
|
Path nnSrc = new Path("/tmp/dir/dir");
|
||||||
|
Path dst = new Path("/mount/dir/subdir");
|
||||||
|
Path nnDst = new Path("/tmp/dir/subdir");
|
||||||
|
Path fileSrc = new Path("/mount/dir/dir/file");
|
||||||
|
Path nnFileSrc = new Path("/tmp/dir/dir/file");
|
||||||
|
Path fileDst = new Path("/mount/dir/subdir/file");
|
||||||
|
Path nnFileDst = new Path("/tmp/dir/subdir/file");
|
||||||
|
DFSTestUtil.createFile(routerFs, fileSrc, 100L, (short) 1, 1024L);
|
||||||
|
if (isRename2) {
|
||||||
|
routerFs.rename(src, dst, Rename.NONE);
|
||||||
|
} else {
|
||||||
|
assertTrue(routerFs.rename(src, dst));
|
||||||
|
}
|
||||||
|
assertTrue(nnFs0.exists(nnDst));
|
||||||
|
assertTrue(nnFs1.exists(nnDst));
|
||||||
|
assertFalse(nnFs0.exists(nnSrc));
|
||||||
|
assertFalse(nnFs1.exists(nnSrc));
|
||||||
|
assertFalse(routerFs.exists(fileSrc));
|
||||||
|
assertTrue(routerFs.exists(fileDst));
|
||||||
|
assertTrue(nnFs0.exists(nnFileDst) || nnFs1.exists(nnFileDst));
|
||||||
|
assertFalse(nnFs0.exists(nnFileSrc) || nnFs1.exists(nnFileSrc));
|
||||||
|
|
||||||
|
// Verify rename file.
|
||||||
|
Path fileRenamed = new Path("/mount/dir/subdir/renamedFile");
|
||||||
|
Path nnFileRenamed = new Path("/tmp/dir/subdir/renamedFile");
|
||||||
|
if (isRename2) {
|
||||||
|
routerFs.rename(fileDst, fileRenamed, Rename.NONE);
|
||||||
|
} else {
|
||||||
|
assertTrue(routerFs.rename(fileDst, fileRenamed));
|
||||||
|
}
|
||||||
|
assertTrue(routerFs.exists(fileRenamed));
|
||||||
|
assertFalse(routerFs.exists(fileDst));
|
||||||
|
assertTrue(nnFs0.exists(nnFileRenamed) || nnFs1.exists(nnFileRenamed));
|
||||||
|
assertFalse(nnFs0.exists(nnFileDst) || nnFs1.exists(nnFileDst));
|
||||||
|
|
||||||
|
// Verify rename when one source directory is not present.
|
||||||
|
Path dst1 = new Path("/mount/dir/renameddir");
|
||||||
|
Path nnDst1 = new Path("/tmp/dir/renameddir");
|
||||||
|
nnFs1.delete(nnDst, true);
|
||||||
|
if (isRename2) {
|
||||||
|
routerFs.rename(dst, dst1, Rename.NONE);
|
||||||
|
} else {
|
||||||
|
assertTrue(routerFs.rename(dst, dst1));
|
||||||
|
}
|
||||||
|
assertTrue(nnFs0.exists(nnDst1));
|
||||||
|
assertFalse(nnFs0.exists(nnDst));
|
||||||
|
|
||||||
|
// Verify rename when one destination directory is already present.
|
||||||
|
Path src1 = new Path("/mount/dir");
|
||||||
|
Path dst2 = new Path("/mount/OneDest");
|
||||||
|
Path nnDst2 = new Path("/tmp/OneDest");
|
||||||
|
nnFs0.mkdirs(nnDst2);
|
||||||
|
if (isRename2) {
|
||||||
|
routerFs.rename(src1, dst2, Rename.NONE);
|
||||||
|
} else {
|
||||||
|
assertTrue(routerFs.rename(src1, dst2));
|
||||||
|
}
|
||||||
|
assertTrue(nnFs0.exists(nnDst2));
|
||||||
|
assertTrue(nnFs1.exists(nnDst2));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generic test for getting the destination subcluster.
|
* Generic test for getting the destination subcluster.
|
||||||
* @param order DestinationOrder of the mount point.
|
* @param order DestinationOrder of the mount point.
|
||||||
|
|
Loading…
Reference in New Issue