diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 06614db5c4c..86e1b431c65 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -324,9 +324,6 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
4*60*60; // 4 hours
- public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
- "ipc.client.async.calls.max";
- public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
deleted file mode 100644
index db97b6cb366..00000000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc;
-
-import java.io.IOException;
-
-/**
- * Signals that an AsyncCallLimitExceededException has occurred. This class is
- * used to make application code using async RPC aware that limit of max async
- * calls is reached, application code need to retrieve results from response of
- * established async calls to avoid buffer overflow in order for follow-on async
- * calls going correctly.
- */
-public class AsyncCallLimitExceededException extends IOException {
- private static final long serialVersionUID = 1L;
-
- public AsyncCallLimitExceededException(String message) {
- super(message);
- }
-}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 9be46493782..d59aeb89955 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -159,9 +159,7 @@ public class Client implements AutoCloseable {
private final boolean fallbackAllowed;
private final byte[] clientId;
- private final int maxAsyncCalls;
- private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
-
+
/**
* Executor on which IPC calls' parameters are sent.
* Deferring the sending of parameters to a separate
@@ -1290,9 +1288,6 @@ public class Client implements AutoCloseable {
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
- this.maxAsyncCalls = conf.getInt(
- CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
- CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
}
/**
@@ -1359,20 +1354,6 @@ public class Client implements AutoCloseable {
fallbackToSimpleAuth);
}
- private void checkAsyncCall() throws IOException {
- if (isAsynchronousMode()) {
- if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
- asyncCallCounter.decrementAndGet();
- String errMsg = String.format(
- "Exceeded limit of max asynchronous calls: %d, " +
- "please configure %s to adjust it.",
- maxAsyncCalls,
- CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
- throw new AsyncCallLimitExceededException(errMsg);
- }
- }
- }
-
/**
* Make a call, passing rpcRequest
, to the IPC server defined by
* remoteId
, returning the rpc response.
@@ -1393,38 +1374,24 @@ public class Client implements AutoCloseable {
final Call call = createCall(rpcKind, rpcRequest);
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
-
try {
- checkAsyncCall();
- try {
- connection.sendRpcRequest(call); // send the rpc request
- } catch (RejectedExecutionException e) {
- throw new IOException("connection has been closed", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("interrupted waiting to send rpc request to server", e);
- throw new IOException(e);
- }
- } catch(Exception e) {
- if (isAsynchronousMode()) {
- releaseAsyncCall();
- }
- throw e;
+ connection.sendRpcRequest(call); // send the rpc request
+ } catch (RejectedExecutionException e) {
+ throw new IOException("connection has been closed", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("interrupted waiting to send rpc request to server", e);
+ throw new IOException(e);
}
if (isAsynchronousMode()) {
Future returnFuture = new AbstractFuture() {
- private final AtomicBoolean callled = new AtomicBoolean(false);
@Override
public Writable get() throws InterruptedException, ExecutionException {
- if (callled.compareAndSet(false, true)) {
- try {
- set(getRpcResponse(call, connection));
- } catch (IOException ie) {
- setException(ie);
- } finally {
- releaseAsyncCall();
- }
+ try {
+ set(getRpcResponse(call, connection));
+ } catch (IOException ie) {
+ setException(ie);
}
return super.get();
}
@@ -1460,15 +1427,6 @@ public class Client implements AutoCloseable {
asynchronousMode.set(async);
}
- private void releaseAsyncCall() {
- asyncCallCounter.decrementAndGet();
- }
-
- @VisibleForTesting
- int getAsyncCallCount() {
- return asyncCallCounter.get();
- }
-
private Writable getRpcResponse(final Call call, final Connection connection)
throws IOException {
synchronized (call) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 8ee3a2c64d3..6cf75c74c7e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -35,7 +34,6 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -56,13 +54,12 @@ public class TestAsyncIPC {
@Before
public void setupConf() {
conf = new Configuration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
// set asynchronous mode for main thread
Client.setAsynchronousMode(true);
}
- static class AsyncCaller extends Thread {
+ protected static class SerialCaller extends Thread {
private Client client;
private InetSocketAddress server;
private int count;
@@ -71,11 +68,11 @@ public class TestAsyncIPC {
new HashMap>();
Map expectedValues = new HashMap();
- public AsyncCaller(Client client, InetSocketAddress server, int count) {
+ public SerialCaller(Client client, InetSocketAddress server, int count) {
this.client = client;
this.server = server;
this.count = count;
- // set asynchronous mode, since AsyncCaller extends Thread
+ // set asynchronous mode, since SerialCaller extends Thread
Client.setAsynchronousMode(true);
}
@@ -110,111 +107,14 @@ public class TestAsyncIPC {
}
}
- static class AsyncLimitlCaller extends Thread {
- private Client client;
- private InetSocketAddress server;
- private int count;
- private boolean failed;
- Map> returnFutures = new HashMap>();
- Map expectedValues = new HashMap();
- int start = 0, end = 0;
-
- int getStart() {
- return start;
- }
-
- int getEnd() {
- return end;
- }
-
- int getCount() {
- return count;
- }
-
- public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
- this(0, client, server, count);
- }
-
- final int callerId;
-
- public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
- int count) {
- this.client = client;
- this.server = server;
- this.count = count;
- // set asynchronous mode, since AsyncLimitlCaller extends Thread
- Client.setAsynchronousMode(true);
- this.callerId = callerId;
- }
-
- @Override
- public void run() {
- // in case Thread#Start is called, which will spawn new thread
- Client.setAsynchronousMode(true);
- for (int i = 0; i < count; i++) {
- try {
- final long param = TestIPC.RANDOM.nextLong();
- runCall(i, param);
- } catch (Exception e) {
- LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
- StringUtils.stringifyException(e)));
- failed = true;
- }
- }
- }
-
- private void runCall(final int idx, final long param)
- throws InterruptedException, ExecutionException, IOException {
- for (;;) {
- try {
- doCall(idx, param);
- return;
- } catch (AsyncCallLimitExceededException e) {
- /**
- * reached limit of async calls, fetch results of finished async calls
- * to let follow-on calls go
- */
- start = end;
- end = idx;
- waitForReturnValues(start, end);
- }
- }
- }
-
- private void doCall(final int idx, final long param) throws IOException {
- TestIPC.call(client, param, server, conf);
- Future returnFuture = Client.getReturnRpcResponse();
- returnFutures.put(idx, returnFuture);
- expectedValues.put(idx, param);
- }
-
- private void waitForReturnValues(final int start, final int end)
- throws InterruptedException, ExecutionException {
- for (int i = start; i < end; i++) {
- LongWritable value = returnFutures.get(i).get();
- if (expectedValues.get(i) != value.get()) {
- LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
- failed = true;
- break;
- }
- }
- }
- }
-
- @Test(timeout = 60000)
- public void testAsyncCall() throws IOException, InterruptedException,
+ @Test
+ public void testSerial() throws IOException, InterruptedException,
ExecutionException {
- internalTestAsyncCall(3, false, 2, 5, 100);
- internalTestAsyncCall(3, true, 2, 5, 10);
+ internalTestSerial(3, false, 2, 5, 100);
+ internalTestSerial(3, true, 2, 5, 10);
}
- @Test(timeout = 60000)
- public void testAsyncCallLimit() throws IOException,
- InterruptedException, ExecutionException {
- internalTestAsyncCallLimit(100, false, 5, 10, 500);
- }
-
- public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
+ public void internalTestSerial(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@@ -226,9 +126,9 @@ public class TestAsyncIPC {
clients[i] = new Client(LongWritable.class, conf);
}
- AsyncCaller[] callers = new AsyncCaller[callerCount];
+ SerialCaller[] callers = new SerialCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
- callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
+ callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
@@ -244,75 +144,6 @@ public class TestAsyncIPC {
server.stop();
}
- @Test(timeout = 60000)
- public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
- InterruptedException, ExecutionException {
- int handlerCount = 10, callCount = 100;
- Server server = new TestIPC.TestServer(handlerCount, false, conf);
- InetSocketAddress addr = NetUtils.getConnectAddress(server);
- server.start();
- final Client client = new Client(LongWritable.class, conf);
-
- int asyncCallCount = client.getAsyncCallCount();
-
- try {
- AsyncCaller caller = new AsyncCaller(client, addr, callCount);
- caller.run();
-
- caller.waitForReturnValues();
- String msg = String.format(
- "First time, expected not failed for caller: %s.", caller);
- assertFalse(msg, caller.failed);
-
- caller.waitForReturnValues();
- assertTrue(asyncCallCount == client.getAsyncCallCount());
- msg = String.format("Second time, expected not failed for caller: %s.",
- caller);
- assertFalse(msg, caller.failed);
-
- assertTrue(asyncCallCount == client.getAsyncCallCount());
- } finally {
- client.stop();
- server.stop();
- }
- }
-
- public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
- int clientCount, int callerCount, int callCount) throws IOException,
- InterruptedException, ExecutionException {
- Configuration conf = new Configuration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
- Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
-
- Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
- InetSocketAddress addr = NetUtils.getConnectAddress(server);
- server.start();
-
- Client[] clients = new Client[clientCount];
- for (int i = 0; i < clientCount; i++) {
- clients[i] = new Client(LongWritable.class, conf);
- }
-
- AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
- for (int i = 0; i < callerCount; i++) {
- callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
- callCount);
- callers[i].start();
- }
- for (int i = 0; i < callerCount; i++) {
- callers[i].join();
- callers[i].waitForReturnValues(callers[i].getStart(),
- callers[i].getCount());
- String msg = String.format("Expected not failed for caller-%d: %s.", i,
- callers[i]);
- assertFalse(msg, callers[i].failed);
- }
- for (int i = 0; i < clientCount; i++) {
- clients[i].stop();
- }
- server.stop();
- }
-
/**
* Test if (1) the rpc server uses the call id/retry provided by the rpc
* client, and (2) the rpc client receives the same call id/retry from the rpc
@@ -365,7 +196,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final AsyncCaller caller = new AsyncCaller(client, addr, 4);
+ final SerialCaller caller = new SerialCaller(client, addr, 4);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -404,7 +235,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+ final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -441,7 +272,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+ final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -482,9 +313,9 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- AsyncCaller[] callers = new AsyncCaller[callerCount];
+ SerialCaller[] callers = new SerialCaller[callerCount];
for (int i = 0; i < callerCount; ++i) {
- callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
+ callers[i] = new SerialCaller(client, addr, perCallerCallCount);
callers[i].start();
}
for (int i = 0; i < callerCount; ++i) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 356ae3ff566..37899aa7030 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
@@ -51,14 +50,11 @@ public class AsyncDistributedFileSystem {
final Callable returnValueCallback = ClientNamenodeProtocolTranslatorPB
.getReturnValueCallback();
Future returnFuture = new AbstractFuture() {
- private final AtomicBoolean called = new AtomicBoolean(false);
public T get() throws InterruptedException, ExecutionException {
- if (called.compareAndSet(false, true)) {
- try {
- set(returnValueCallback.call());
- } catch (Exception e) {
- setException(e);
- }
+ try {
+ set(returnValueCallback.call());
+ } catch (Exception e) {
+ setException(e);
}
return super.get();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index d129299bf59..9322e1afbb3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
@@ -30,25 +31,80 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
public class TestAsyncDFSRename {
+ final Path asyncRenameDir = new Path("/test/async_rename/");
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+ final private static Configuration CONF = new HdfsConfiguration();
+
+ final private static String GROUP1_NAME = "group1";
+ final private static String GROUP2_NAME = "group2";
+ final private static String USER1_NAME = "user1";
+ private static final UserGroupInformation USER1;
+
+ private MiniDFSCluster gCluster;
+
+ static {
+ // explicitly turn on permission checking
+ CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+ // create fake mapping for the groups
+ Map u2g_map = new HashMap(1);
+ u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
+ DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
+
+ // Initiate all four users
+ USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
+ GROUP1_NAME, GROUP2_NAME });
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
+ gCluster.waitActive();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (gCluster != null) {
+ gCluster.shutdown();
+ gCluster = null;
+ }
+ }
+
+ static int countLease(MiniDFSCluster cluster) {
+ return TestDFSRename.countLease(cluster);
+ }
+
+ void list(DistributedFileSystem dfs, String name) throws IOException {
+ FileSystem.LOG.info("\n\n" + name);
+ for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
+ FileSystem.LOG.info("" + s.getPath());
+ }
+ }
+
+ static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
+ DataOutputStream a_out = dfs.create(f);
+ a_out.writeBytes("something");
+ a_out.close();
+ }
/**
* Check the blocks of dst file are cleaned after rename with overwrite
* Restart NN to check the rename successfully
*/
- @Test(timeout = 60000)
+ @Test
public void testAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
@@ -113,26 +169,26 @@ public class TestAsyncDFSRename {
}
}
- @Test(timeout = 60000)
- public void testCallGetReturnValueMultipleTimes() throws Exception {
+ @Test
+ public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(
- "/test/testCallGetReturnValueMultipleTimes/");
- final Configuration conf = new HdfsConfiguration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(2).build();
+ "/test/concurrent_reanme_with_overwrite_dir/");
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .build();
cluster.waitActive();
- final DistributedFileSystem dfs = cluster.getFileSystem();
- final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
- final int count = 100;
- long fileLen = blockSize * 3;
- final Map> returnFutures = new HashMap>();
-
- assertTrue(dfs.mkdirs(renameDir));
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+ int count = 1000;
try {
+ long fileLen = blockSize * 3;
+ assertTrue(dfs.mkdirs(renameDir));
+
+ Map> returnFutures = new HashMap>();
+
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
@@ -143,104 +199,8 @@ public class TestAsyncDFSRename {
returnFutures.put(i, returnFuture);
}
- for (int i = 0; i < 5; i++) {
- verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
- renameDir, dfs);
- }
- } finally {
- if (dfs != null) {
- dfs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- private void verifyCallGetReturnValueMultipleTimes(
- Map> returnFutures, int count,
- MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
- throws InterruptedException, ExecutionException, IOException {
- // wait for completing the calls
- for (int i = 0; i < count; i++) {
- returnFutures.get(i).get();
- }
-
- // Restart NN and check the rename successfully
- cluster.restartNameNodes();
-
- // very the src dir should not exist, dst should
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- assertFalse(dfs.exists(src));
- assertTrue(dfs.exists(dst));
- }
- }
-
- @Test(timeout = 120000)
- public void testAggressiveConcurrentAsyncRenameWithOverwrite()
- throws Exception {
- internalTestConcurrentAsyncRenameWithOverwrite(100,
- "testAggressiveConcurrentAsyncRenameWithOverwrite");
- }
-
- @Test(timeout = 60000)
- public void testConservativeConcurrentAsyncRenameWithOverwrite()
- throws Exception {
- internalTestConcurrentAsyncRenameWithOverwrite(10000,
- "testConservativeConcurrentAsyncRenameWithOverwrite");
- }
-
- private void internalTestConcurrentAsyncRenameWithOverwrite(
- final int asyncCallLimit, final String basePath) throws Exception {
- final short replFactor = 2;
- final long blockSize = 512;
- final Path renameDir = new Path(String.format("/test/%s/", basePath));
- Configuration conf = new HdfsConfiguration();
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
- asyncCallLimit);
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
- .build();
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
- int count = 1000;
- long fileLen = blockSize * 3;
- int start = 0, end = 0;
- Map> returnFutures = new HashMap>();
-
- assertTrue(dfs.mkdirs(renameDir));
-
- try {
- // concurrently invoking many rename
- for (int i = 0; i < count; i++) {
- Path src = new Path(renameDir, "src" + i);
- Path dst = new Path(renameDir, "dst" + i);
- DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
- DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
- for (;;) {
- try {
- LOG.info("rename #" + i);
- Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
- returnFutures.put(i, returnFuture);
- break;
- } catch (AsyncCallLimitExceededException e) {
- /**
- * reached limit of async calls, fetch results of finished async
- * calls to let follow-on calls go
- */
- LOG.error(e);
- start = end;
- end = i;
- LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
- waitForReturnValues(returnFutures, start, end);
- }
- }
- }
-
// wait for completing the calls
- for (int i = start; i < count; i++) {
+ for (int i = 0; i < count; i++) {
returnFutures.get(i).get();
}
@@ -255,60 +215,26 @@ public class TestAsyncDFSRename {
assertTrue(dfs.exists(dst));
}
} finally {
- if (dfs != null) {
- dfs.close();
- }
+ dfs.delete(renameDir, true);
if (cluster != null) {
cluster.shutdown();
}
}
}
- private void waitForReturnValues(
- final Map> returnFutures, final int start,
- final int end) throws InterruptedException, ExecutionException {
- LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
- for (int i = start; i < end; i++) {
- LOG.info("calling Future#get #" + i);
- returnFutures.get(i).get();
- }
- }
-
- @Test(timeout = 60000)
+ @Test
public void testAsyncRenameWithException() throws Exception {
- Configuration conf = new HdfsConfiguration();
- String group1 = "group1";
- String group2 = "group2";
- String user1 = "user1";
- UserGroupInformation ugi1;
-
- // explicitly turn on permission checking
- conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
- // create fake mapping for the groups
- Map u2g_map = new HashMap(1);
- u2g_map.put(user1, new String[] { group1, group2 });
- DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
-
- // Initiate all four users
- ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
- group1, group2 });
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(3).build();
- cluster.waitActive();
-
- FileSystem rootFs = FileSystem.get(conf);
+ FileSystem rootFs = FileSystem.get(CONF);
final Path renameDir = new Path("/test/async_rename_exception/");
final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src);
- AsyncDistributedFileSystem adfs = ugi1
+ AsyncDistributedFileSystem adfs = USER1
.doAs(new PrivilegedExceptionAction() {
@Override
public AsyncDistributedFileSystem run() throws Exception {
- return cluster.getFileSystem().getAsyncDistributedFileSystem();
+ return gCluster.getFileSystem().getAsyncDistributedFileSystem();
}
});
@@ -316,24 +242,16 @@ public class TestAsyncDFSRename {
Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFuture.get();
} catch (ExecutionException e) {
- checkPermissionDenied(e, src, user1);
- } finally {
- if (rootFs != null) {
- rootFs.close();
- }
- if (cluster != null) {
- cluster.shutdown();
- }
+ checkPermissionDenied(e, src);
}
}
- private void checkPermissionDenied(final Exception e, final Path dir,
- final String user) {
+ private void checkPermissionDenied(final Exception e, final Path dir) {
assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
- .getMessage().contains(user));
+ .getMessage().contains(USER1_NAME));
assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(dir.getParent().toUri().getPath()));
}