() {
+ public T get() throws InterruptedException, ExecutionException {
+ try {
+ set(returnValueCallback.call());
+ } catch (Exception e) {
+ setException(e);
+ }
+ return super.get();
+ }
+ };
+ return returnFuture;
+ }
+
+ /**
+ * Renames Path src to Path dst
+ *
+ * - Fails if src is a file and dst is a directory.
+ *
- Fails if src is a directory and dst is a file.
+ *
- Fails if the parent of dst does not exist or is a file.
+ *
+ *
+ * If OVERWRITE option is not passed as an argument, rename fails if the dst
+ * already exists.
+ *
+ * If OVERWRITE option is passed as an argument, rename overwrites the dst if
+ * it is a file or an empty directory. Rename fails if dst is a non-empty
+ * directory.
+ *
+ * Note that atomicity of rename is dependent on the file system
+ * implementation. Please refer to the file system documentation for details.
+ * This default implementation is non atomic.
+ *
+ * @param src
+ * path to be renamed
+ * @param dst
+ * new path after rename
+ * @throws IOException
+ * on failure
+ * @return an instance of Future, #get of which is invoked to wait for
+ * asynchronous call being finished.
+ */
+ public Future rename(Path src, Path dst,
+ final Options.Rename... options) throws IOException {
+ dfs.getFsStatistics().incrementWriteOps(1);
+
+ final Path absSrc = dfs.fixRelativePart(src);
+ final Path absDst = dfs.fixRelativePart(dst);
+
+ final boolean isAsync = Client.isAsynchronousMode();
+ Client.setAsynchronousMode(true);
+ try {
+ dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
+ options);
+ return getReturnValue();
+ } finally {
+ Client.setAsynchronousMode(isAsync);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 27881d7be6b..2ffe11a206b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -31,6 +31,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
@@ -205,7 +206,7 @@ public class DistributedFileSystem extends FileSystem {
* @return path component of {file}
* @throws IllegalArgumentException if URI does not belong to this DFS
*/
- private String getPathName(Path file) {
+ String getPathName(Path file) {
checkPath(file);
String result = file.toUri().getPath();
if (!DFSUtilClient.isValidName(result)) {
@@ -2479,4 +2480,23 @@ public class DistributedFileSystem extends FileSystem {
}
return ret;
}
+
+ private final AsyncDistributedFileSystem adfs =
+ new AsyncDistributedFileSystem(this);
+
+ /** @return an {@link AsyncDistributedFileSystem} object. */
+ @Unstable
+ public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
+ return adfs;
+ }
+
+ @Override
+ protected Path fixRelativePart(Path p) {
+ return super.fixRelativePart(p);
+ }
+
+ Statistics getFsStatistics() {
+ return statistics;
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 6aeed28d3bb..75fba213409 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,11 +24,14 @@ import java.util.EnumSet;
import java.util.List;
import com.google.common.collect.Lists;
+import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -152,13 +155,14 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
@@ -170,7 +174,9 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@@ -182,12 +188,9 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
-import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
-import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
- .EncryptionZoneProto;
-
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
* while translating from the parameter types used in ClientProtocol to the
@@ -198,6 +201,8 @@ import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
+ private static final ThreadLocal>
+ RETURN_VALUE_CALLBACK = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -230,6 +235,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy = proxy;
}
+ @SuppressWarnings("unchecked")
+ @Unstable
+ public static Callable getReturnValueCallback() {
+ return (Callable) RETURN_VALUE_CALLBACK.get();
+ }
+
@Override
public void close() {
RPC.stopProxy(rpcProxy);
@@ -465,6 +476,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
RenameRequestProto req = RenameRequestProto.newBuilder()
.setSrc(src)
.setDst(dst).build();
+
try {
return rpcProxy.rename(null, req).getResult();
} catch (ServiceException e) {
@@ -489,7 +501,22 @@ public class ClientNamenodeProtocolTranslatorPB implements
setDst(dst).setOverwriteDest(overwrite).
build();
try {
- rpcProxy.rename2(null, req);
+ if (Client.isAsynchronousMode()) {
+ rpcProxy.rename2(null, req);
+
+ final Callable returnMessageCallback = ProtobufRpcEngine
+ .getReturnMessageCallback();
+ Callable callBack = new Callable() {
+ @Override
+ public Void call() throws Exception {
+ returnMessageCallback.call();
+ return null;
+ }
+ };
+ RETURN_VALUE_CALLBACK.set(callBack);
+ } else {
+ rpcProxy.rename2(null, req);
+ }
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
new file mode 100644
index 00000000000..9322e1afbb3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAsyncDFSRename {
+ final Path asyncRenameDir = new Path("/test/async_rename/");
+ public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+ final private static Configuration CONF = new HdfsConfiguration();
+
+ final private static String GROUP1_NAME = "group1";
+ final private static String GROUP2_NAME = "group2";
+ final private static String USER1_NAME = "user1";
+ private static final UserGroupInformation USER1;
+
+ private MiniDFSCluster gCluster;
+
+ static {
+ // explicitly turn on permission checking
+ CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+ // create fake mapping for the groups
+ Map u2g_map = new HashMap(1);
+ u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
+ DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
+
+ // Initiate all four users
+ USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
+ GROUP1_NAME, GROUP2_NAME });
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
+ gCluster.waitActive();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (gCluster != null) {
+ gCluster.shutdown();
+ gCluster = null;
+ }
+ }
+
+ static int countLease(MiniDFSCluster cluster) {
+ return TestDFSRename.countLease(cluster);
+ }
+
+ void list(DistributedFileSystem dfs, String name) throws IOException {
+ FileSystem.LOG.info("\n\n" + name);
+ for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
+ FileSystem.LOG.info("" + s.getPath());
+ }
+ }
+
+ static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
+ DataOutputStream a_out = dfs.create(f);
+ a_out.writeBytes("something");
+ a_out.close();
+ }
+
+ /**
+ * Check the blocks of dst file are cleaned after rename with overwrite
+ * Restart NN to check the rename successfully
+ */
+ @Test
+ public void testAsyncRenameWithOverwrite() throws Exception {
+ final short replFactor = 2;
+ final long blockSize = 512;
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+ replFactor).build();
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+
+ try {
+
+ long fileLen = blockSize * 3;
+ String src = "/foo/src";
+ String dst = "/foo/dst";
+ String src2 = "/foo/src2";
+ String dst2 = "/foo/dst2";
+ Path srcPath = new Path(src);
+ Path dstPath = new Path(dst);
+ Path srcPath2 = new Path(src2);
+ Path dstPath2 = new Path(dst2);
+
+ DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
+
+ LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
+ cluster.getNameNode(), dst, 0, fileLen);
+ LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
+ cluster.getNameNode(), dst2, 0, fileLen);
+ BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
+ .getBlockManager();
+ assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) != null);
+ assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) != null);
+
+ Future retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
+ Future retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
+ retVal1.get();
+ retVal2.get();
+
+ assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) == null);
+ assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+ .getLocalBlock()) == null);
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+ assertFalse(dfs.exists(srcPath));
+ assertTrue(dfs.exists(dstPath));
+ assertFalse(dfs.exists(srcPath2));
+ assertTrue(dfs.exists(dstPath2));
+ } finally {
+ if (dfs != null) {
+ dfs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
+ final short replFactor = 2;
+ final long blockSize = 512;
+ final Path renameDir = new Path(
+ "/test/concurrent_reanme_with_overwrite_dir/");
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .build();
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+ int count = 1000;
+
+ try {
+ long fileLen = blockSize * 3;
+ assertTrue(dfs.mkdirs(renameDir));
+
+ Map> returnFutures = new HashMap>();
+
+ // concurrently invoking many rename
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+ Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFutures.put(i, returnFuture);
+ }
+
+ // wait for completing the calls
+ for (int i = 0; i < count; i++) {
+ returnFutures.get(i).get();
+ }
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+
+ // very the src dir should not exist, dst should
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ assertFalse(dfs.exists(src));
+ assertTrue(dfs.exists(dst));
+ }
+ } finally {
+ dfs.delete(renameDir, true);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testAsyncRenameWithException() throws Exception {
+ FileSystem rootFs = FileSystem.get(CONF);
+ final Path renameDir = new Path("/test/async_rename_exception/");
+ final Path src = new Path(renameDir, "src");
+ final Path dst = new Path(renameDir, "dst");
+ rootFs.mkdirs(src);
+
+ AsyncDistributedFileSystem adfs = USER1
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public AsyncDistributedFileSystem run() throws Exception {
+ return gCluster.getFileSystem().getAsyncDistributedFileSystem();
+ }
+ });
+
+ try {
+ Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFuture.get();
+ } catch (ExecutionException e) {
+ checkPermissionDenied(e, src);
+ }
+ }
+
+ private void checkPermissionDenied(final Exception e, final Path dir) {
+ assertTrue(e.getCause() instanceof ExecutionException);
+ assertTrue("Permission denied messages must carry AccessControlException",
+ e.getMessage().contains("AccessControlException"));
+ assertTrue("Permission denied messages must carry the username", e
+ .getMessage().contains(USER1_NAME));
+ assertTrue("Permission denied messages must carry the path parent", e
+ .getMessage().contains(dir.getParent().toUri().getPath()));
+ }
+}
\ No newline at end of file