() {
- public T get() throws InterruptedException, ExecutionException {
- try {
- set(returnValueCallback.call());
- } catch (Exception e) {
- setException(e);
- }
- return super.get();
- }
- };
- return returnFuture;
- }
-
- /**
- * Renames Path src to Path dst
- *
- * - Fails if src is a file and dst is a directory.
- *
- Fails if src is a directory and dst is a file.
- *
- Fails if the parent of dst does not exist or is a file.
- *
- *
- * If OVERWRITE option is not passed as an argument, rename fails if the dst
- * already exists.
- *
- * If OVERWRITE option is passed as an argument, rename overwrites the dst if
- * it is a file or an empty directory. Rename fails if dst is a non-empty
- * directory.
- *
- * Note that atomicity of rename is dependent on the file system
- * implementation. Please refer to the file system documentation for details.
- * This default implementation is non atomic.
- *
- * @param src
- * path to be renamed
- * @param dst
- * new path after rename
- * @throws IOException
- * on failure
- * @return an instance of Future, #get of which is invoked to wait for
- * asynchronous call being finished.
- */
- public Future rename(Path src, Path dst,
- final Options.Rename... options) throws IOException {
- dfs.getFsStatistics().incrementWriteOps(1);
-
- final Path absSrc = dfs.fixRelativePart(src);
- final Path absDst = dfs.fixRelativePart(dst);
-
- final boolean isAsync = Client.isAsynchronousMode();
- Client.setAsynchronousMode(true);
- try {
- dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
- options);
- return getReturnValue();
- } finally {
- Client.setAsynchronousMode(isAsync);
- }
- }
-}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 2ffe11a206b..27881d7be6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -31,7 +31,6 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
@@ -206,7 +205,7 @@ public class DistributedFileSystem extends FileSystem {
* @return path component of {file}
* @throws IllegalArgumentException if URI does not belong to this DFS
*/
- String getPathName(Path file) {
+ private String getPathName(Path file) {
checkPath(file);
String result = file.toUri().getPath();
if (!DFSUtilClient.isValidName(result)) {
@@ -2480,23 +2479,4 @@ public class DistributedFileSystem extends FileSystem {
}
return ret;
}
-
- private final AsyncDistributedFileSystem adfs =
- new AsyncDistributedFileSystem(this);
-
- /** @return an {@link AsyncDistributedFileSystem} object. */
- @Unstable
- public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
- return adfs;
- }
-
- @Override
- protected Path fixRelativePart(Path p) {
- return super.fixRelativePart(p);
- }
-
- Statistics getFsStatistics() {
- return statistics;
- }
-
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 75fba213409..6aeed28d3bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,14 +24,11 @@ import java.util.EnumSet;
import java.util.List;
import com.google.common.collect.Lists;
-import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
-import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -155,14 +152,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
@@ -174,9 +170,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@@ -188,9 +182,12 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
+import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
+ .EncryptionZoneProto;
+
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
* while translating from the parameter types used in ClientProtocol to the
@@ -201,8 +198,6 @@ import com.google.protobuf.ServiceException;
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
- private static final ThreadLocal>
- RETURN_VALUE_CALLBACK = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -235,12 +230,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy = proxy;
}
- @SuppressWarnings("unchecked")
- @Unstable
- public static Callable getReturnValueCallback() {
- return (Callable) RETURN_VALUE_CALLBACK.get();
- }
-
@Override
public void close() {
RPC.stopProxy(rpcProxy);
@@ -476,7 +465,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
RenameRequestProto req = RenameRequestProto.newBuilder()
.setSrc(src)
.setDst(dst).build();
-
try {
return rpcProxy.rename(null, req).getResult();
} catch (ServiceException e) {
@@ -501,22 +489,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
setDst(dst).setOverwriteDest(overwrite).
build();
try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.rename2(null, req);
-
- final Callable returnMessageCallback = ProtobufRpcEngine
- .getReturnMessageCallback();
- Callable callBack = new Callable() {
- @Override
- public Void call() throws Exception {
- returnMessageCallback.call();
- return null;
- }
- };
- RETURN_VALUE_CALLBACK.set(callBack);
- } else {
- rpcProxy.rename2(null, req);
- }
+ rpcProxy.rename2(null, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
deleted file mode 100644
index 9322e1afbb3..00000000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestAsyncDFSRename {
- final Path asyncRenameDir = new Path("/test/async_rename/");
- public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
- final private static Configuration CONF = new HdfsConfiguration();
-
- final private static String GROUP1_NAME = "group1";
- final private static String GROUP2_NAME = "group2";
- final private static String USER1_NAME = "user1";
- private static final UserGroupInformation USER1;
-
- private MiniDFSCluster gCluster;
-
- static {
- // explicitly turn on permission checking
- CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
- // create fake mapping for the groups
- Map u2g_map = new HashMap(1);
- u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
- DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
-
- // Initiate all four users
- USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
- GROUP1_NAME, GROUP2_NAME });
- }
-
- @Before
- public void setUp() throws IOException {
- gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
- gCluster.waitActive();
- }
-
- @After
- public void tearDown() throws IOException {
- if (gCluster != null) {
- gCluster.shutdown();
- gCluster = null;
- }
- }
-
- static int countLease(MiniDFSCluster cluster) {
- return TestDFSRename.countLease(cluster);
- }
-
- void list(DistributedFileSystem dfs, String name) throws IOException {
- FileSystem.LOG.info("\n\n" + name);
- for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
- FileSystem.LOG.info("" + s.getPath());
- }
- }
-
- static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
- DataOutputStream a_out = dfs.create(f);
- a_out.writeBytes("something");
- a_out.close();
- }
-
- /**
- * Check the blocks of dst file are cleaned after rename with overwrite
- * Restart NN to check the rename successfully
- */
- @Test
- public void testAsyncRenameWithOverwrite() throws Exception {
- final short replFactor = 2;
- final long blockSize = 512;
- Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
- replFactor).build();
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-
- try {
-
- long fileLen = blockSize * 3;
- String src = "/foo/src";
- String dst = "/foo/dst";
- String src2 = "/foo/src2";
- String dst2 = "/foo/dst2";
- Path srcPath = new Path(src);
- Path dstPath = new Path(dst);
- Path srcPath2 = new Path(src2);
- Path dstPath2 = new Path(dst2);
-
- DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
-
- LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
- cluster.getNameNode(), dst, 0, fileLen);
- LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
- cluster.getNameNode(), dst2, 0, fileLen);
- BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
- .getBlockManager();
- assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) != null);
- assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) != null);
-
- Future retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
- Future retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
- retVal1.get();
- retVal2.get();
-
- assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) == null);
- assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
- .getLocalBlock()) == null);
-
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
- assertFalse(dfs.exists(srcPath));
- assertTrue(dfs.exists(dstPath));
- assertFalse(dfs.exists(srcPath2));
- assertTrue(dfs.exists(dstPath2));
- } finally {
- if (dfs != null) {
- dfs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- @Test
- public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
- final short replFactor = 2;
- final long blockSize = 512;
- final Path renameDir = new Path(
- "/test/concurrent_reanme_with_overwrite_dir/");
- Configuration conf = new HdfsConfiguration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
- .build();
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
- int count = 1000;
-
- try {
- long fileLen = blockSize * 3;
- assertTrue(dfs.mkdirs(renameDir));
-
- Map> returnFutures = new HashMap>();
-
- // concurrently invoking many rename
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
- Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFutures.put(i, returnFuture);
- }
-
- // wait for completing the calls
- for (int i = 0; i < count; i++) {
- returnFutures.get(i).get();
- }
-
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
-
- // very the src dir should not exist, dst should
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- assertFalse(dfs.exists(src));
- assertTrue(dfs.exists(dst));
- }
- } finally {
- dfs.delete(renameDir, true);
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- @Test
- public void testAsyncRenameWithException() throws Exception {
- FileSystem rootFs = FileSystem.get(CONF);
- final Path renameDir = new Path("/test/async_rename_exception/");
- final Path src = new Path(renameDir, "src");
- final Path dst = new Path(renameDir, "dst");
- rootFs.mkdirs(src);
-
- AsyncDistributedFileSystem adfs = USER1
- .doAs(new PrivilegedExceptionAction() {
- @Override
- public AsyncDistributedFileSystem run() throws Exception {
- return gCluster.getFileSystem().getAsyncDistributedFileSystem();
- }
- });
-
- try {
- Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFuture.get();
- } catch (ExecutionException e) {
- checkPermissionDenied(e, src);
- }
- }
-
- private void checkPermissionDenied(final Exception e, final Path dir) {
- assertTrue(e.getCause() instanceof ExecutionException);
- assertTrue("Permission denied messages must carry AccessControlException",
- e.getMessage().contains("AccessControlException"));
- assertTrue("Permission denied messages must carry the username", e
- .getMessage().contains(USER1_NAME));
- assertTrue("Permission denied messages must carry the path parent", e
- .getMessage().contains(dir.getParent().toUri().getPath()));
- }
-}
\ No newline at end of file