diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 86e1b431c65..06614db5c4c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -324,6 +324,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT = 4*60*60; // 4 hours + public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY = + "ipc.client.async.calls.max"; + public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100; public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed"; public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java new file mode 100644 index 00000000000..db97b6cb366 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.io.IOException; + +/** + * Signals that an AsyncCallLimitExceededException has occurred. This class is + * used to make application code using async RPC aware that limit of max async + * calls is reached, application code need to retrieve results from response of + * established async calls to avoid buffer overflow in order for follow-on async + * calls going correctly. + */ +public class AsyncCallLimitExceededException extends IOException { + private static final long serialVersionUID = 1L; + + public AsyncCallLimitExceededException(String message) { + super(message); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index d59aeb89955..9be46493782 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -159,7 +159,9 @@ public class Client implements AutoCloseable { private final boolean fallbackAllowed; private final byte[] clientId; - + private final int maxAsyncCalls; + private final AtomicInteger asyncCallCounter = new AtomicInteger(0); + /** * Executor on which IPC calls' parameters are sent. * Deferring the sending of parameters to a separate @@ -1288,6 +1290,9 @@ public class Client implements AutoCloseable { CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.clientId = ClientId.getClientId(); this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); + this.maxAsyncCalls = conf.getInt( + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); } /** @@ -1354,6 +1359,20 @@ public class Client implements AutoCloseable { fallbackToSimpleAuth); } + private void checkAsyncCall() throws IOException { + if (isAsynchronousMode()) { + if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) { + asyncCallCounter.decrementAndGet(); + String errMsg = String.format( + "Exceeded limit of max asynchronous calls: %d, " + + "please configure %s to adjust it.", + maxAsyncCalls, + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY); + throw new AsyncCallLimitExceededException(errMsg); + } + } + } + /** * Make a call, passing rpcRequest, to the IPC server defined by * remoteId, returning the rpc response. @@ -1374,24 +1393,38 @@ public class Client implements AutoCloseable { final Call call = createCall(rpcKind, rpcRequest); final Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); + try { - connection.sendRpcRequest(call); // send the rpc request - } catch (RejectedExecutionException e) { - throw new IOException("connection has been closed", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("interrupted waiting to send rpc request to server", e); - throw new IOException(e); + checkAsyncCall(); + try { + connection.sendRpcRequest(call); // send the rpc request + } catch (RejectedExecutionException e) { + throw new IOException("connection has been closed", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("interrupted waiting to send rpc request to server", e); + throw new IOException(e); + } + } catch(Exception e) { + if (isAsynchronousMode()) { + releaseAsyncCall(); + } + throw e; } if (isAsynchronousMode()) { Future returnFuture = new AbstractFuture() { + private final AtomicBoolean callled = new AtomicBoolean(false); @Override public Writable get() throws InterruptedException, ExecutionException { - try { - set(getRpcResponse(call, connection)); - } catch (IOException ie) { - setException(ie); + if (callled.compareAndSet(false, true)) { + try { + set(getRpcResponse(call, connection)); + } catch (IOException ie) { + setException(ie); + } finally { + releaseAsyncCall(); + } } return super.get(); } @@ -1427,6 +1460,15 @@ public class Client implements AutoCloseable { asynchronousMode.set(async); } + private void releaseAsyncCall() { + asyncCallCounter.decrementAndGet(); + } + + @VisibleForTesting + int getAsyncCallCount() { + return asyncCallCounter.get(); + } + private Writable getRpcResponse(final Call call, final Connection connection) throws IOException { synchronized (call) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java index 6cf75c74c7e..8ee3a2c64d3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ipc; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.InetSocketAddress; @@ -34,6 +35,7 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RPC.RpcKind; @@ -54,12 +56,13 @@ public class TestAsyncIPC { @Before public void setupConf() { conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000); Client.setPingInterval(conf, TestIPC.PING_INTERVAL); // set asynchronous mode for main thread Client.setAsynchronousMode(true); } - protected static class SerialCaller extends Thread { + static class AsyncCaller extends Thread { private Client client; private InetSocketAddress server; private int count; @@ -68,11 +71,11 @@ public class TestAsyncIPC { new HashMap>(); Map expectedValues = new HashMap(); - public SerialCaller(Client client, InetSocketAddress server, int count) { + public AsyncCaller(Client client, InetSocketAddress server, int count) { this.client = client; this.server = server; this.count = count; - // set asynchronous mode, since SerialCaller extends Thread + // set asynchronous mode, since AsyncCaller extends Thread Client.setAsynchronousMode(true); } @@ -107,14 +110,111 @@ public class TestAsyncIPC { } } - @Test - public void testSerial() throws IOException, InterruptedException, - ExecutionException { - internalTestSerial(3, false, 2, 5, 100); - internalTestSerial(3, true, 2, 5, 10); + static class AsyncLimitlCaller extends Thread { + private Client client; + private InetSocketAddress server; + private int count; + private boolean failed; + Map> returnFutures = new HashMap>(); + Map expectedValues = new HashMap(); + int start = 0, end = 0; + + int getStart() { + return start; + } + + int getEnd() { + return end; + } + + int getCount() { + return count; + } + + public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) { + this(0, client, server, count); + } + + final int callerId; + + public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server, + int count) { + this.client = client; + this.server = server; + this.count = count; + // set asynchronous mode, since AsyncLimitlCaller extends Thread + Client.setAsynchronousMode(true); + this.callerId = callerId; + } + + @Override + public void run() { + // in case Thread#Start is called, which will spawn new thread + Client.setAsynchronousMode(true); + for (int i = 0; i < count; i++) { + try { + final long param = TestIPC.RANDOM.nextLong(); + runCall(i, param); + } catch (Exception e) { + LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i, + StringUtils.stringifyException(e))); + failed = true; + } + } + } + + private void runCall(final int idx, final long param) + throws InterruptedException, ExecutionException, IOException { + for (;;) { + try { + doCall(idx, param); + return; + } catch (AsyncCallLimitExceededException e) { + /** + * reached limit of async calls, fetch results of finished async calls + * to let follow-on calls go + */ + start = end; + end = idx; + waitForReturnValues(start, end); + } + } + } + + private void doCall(final int idx, final long param) throws IOException { + TestIPC.call(client, param, server, conf); + Future returnFuture = Client.getReturnRpcResponse(); + returnFutures.put(idx, returnFuture); + expectedValues.put(idx, param); + } + + private void waitForReturnValues(final int start, final int end) + throws InterruptedException, ExecutionException { + for (int i = start; i < end; i++) { + LongWritable value = returnFutures.get(i).get(); + if (expectedValues.get(i) != value.get()) { + LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i)); + failed = true; + break; + } + } + } } - public void internalTestSerial(int handlerCount, boolean handlerSleep, + @Test(timeout = 60000) + public void testAsyncCall() throws IOException, InterruptedException, + ExecutionException { + internalTestAsyncCall(3, false, 2, 5, 100); + internalTestAsyncCall(3, true, 2, 5, 10); + } + + @Test(timeout = 60000) + public void testAsyncCallLimit() throws IOException, + InterruptedException, ExecutionException { + internalTestAsyncCallLimit(100, false, 5, 10, 500); + } + + public void internalTestAsyncCall(int handlerCount, boolean handlerSleep, int clientCount, int callerCount, int callCount) throws IOException, InterruptedException, ExecutionException { Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf); @@ -126,9 +226,9 @@ public class TestAsyncIPC { clients[i] = new Client(LongWritable.class, conf); } - SerialCaller[] callers = new SerialCaller[callerCount]; + AsyncCaller[] callers = new AsyncCaller[callerCount]; for (int i = 0; i < callerCount; i++) { - callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount); + callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount); callers[i].start(); } for (int i = 0; i < callerCount; i++) { @@ -144,6 +244,75 @@ public class TestAsyncIPC { server.stop(); } + @Test(timeout = 60000) + public void testCallGetReturnRpcResponseMultipleTimes() throws IOException, + InterruptedException, ExecutionException { + int handlerCount = 10, callCount = 100; + Server server = new TestIPC.TestServer(handlerCount, false, conf); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + final Client client = new Client(LongWritable.class, conf); + + int asyncCallCount = client.getAsyncCallCount(); + + try { + AsyncCaller caller = new AsyncCaller(client, addr, callCount); + caller.run(); + + caller.waitForReturnValues(); + String msg = String.format( + "First time, expected not failed for caller: %s.", caller); + assertFalse(msg, caller.failed); + + caller.waitForReturnValues(); + assertTrue(asyncCallCount == client.getAsyncCallCount()); + msg = String.format("Second time, expected not failed for caller: %s.", + caller); + assertFalse(msg, caller.failed); + + assertTrue(asyncCallCount == client.getAsyncCallCount()); + } finally { + client.stop(); + server.stop(); + } + } + + public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep, + int clientCount, int callerCount, int callCount) throws IOException, + InterruptedException, ExecutionException { + Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100); + Client.setPingInterval(conf, TestIPC.PING_INTERVAL); + + Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + Client[] clients = new Client[clientCount]; + for (int i = 0; i < clientCount; i++) { + clients[i] = new Client(LongWritable.class, conf); + } + + AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount]; + for (int i = 0; i < callerCount; i++) { + callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr, + callCount); + callers[i].start(); + } + for (int i = 0; i < callerCount; i++) { + callers[i].join(); + callers[i].waitForReturnValues(callers[i].getStart(), + callers[i].getCount()); + String msg = String.format("Expected not failed for caller-%d: %s.", i, + callers[i]); + assertFalse(msg, callers[i].failed); + } + for (int i = 0; i < clientCount; i++) { + clients[i].stop(); + } + server.stop(); + } + /** * Test if (1) the rpc server uses the call id/retry provided by the rpc * client, and (2) the rpc client receives the same call id/retry from the rpc @@ -196,7 +365,7 @@ public class TestAsyncIPC { try { InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); - final SerialCaller caller = new SerialCaller(client, addr, 4); + final AsyncCaller caller = new AsyncCaller(client, addr, 4); caller.run(); caller.waitForReturnValues(); String msg = String.format("Expected not failed for caller: %s.", caller); @@ -235,7 +404,7 @@ public class TestAsyncIPC { try { InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); - final SerialCaller caller = new SerialCaller(client, addr, 10); + final AsyncCaller caller = new AsyncCaller(client, addr, 10); caller.run(); caller.waitForReturnValues(); String msg = String.format("Expected not failed for caller: %s.", caller); @@ -272,7 +441,7 @@ public class TestAsyncIPC { try { InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); - final SerialCaller caller = new SerialCaller(client, addr, 10); + final AsyncCaller caller = new AsyncCaller(client, addr, 10); caller.run(); caller.waitForReturnValues(); String msg = String.format("Expected not failed for caller: %s.", caller); @@ -313,9 +482,9 @@ public class TestAsyncIPC { try { InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); - SerialCaller[] callers = new SerialCaller[callerCount]; + AsyncCaller[] callers = new AsyncCaller[callerCount]; for (int i = 0; i < callerCount; ++i) { - callers[i] = new SerialCaller(client, addr, perCallerCallCount); + callers[i] = new AsyncCaller(client, addr, perCallerCallCount); callers[i].start(); } for (int i = 0; i < callerCount; ++i) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java index 37899aa7030..356ae3ff566 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.fs.Options; @@ -50,11 +51,14 @@ public class AsyncDistributedFileSystem { final Callable returnValueCallback = ClientNamenodeProtocolTranslatorPB .getReturnValueCallback(); Future returnFuture = new AbstractFuture() { + private final AtomicBoolean called = new AtomicBoolean(false); public T get() throws InterruptedException, ExecutionException { - try { - set(returnValueCallback.call()); - } catch (Exception e) { - setException(e); + if (called.compareAndSet(false, true)) { + try { + set(returnValueCallback.call()); + } catch (Exception e) { + setException(e); + } } return super.get(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java index 9322e1afbb3..d129299bf59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.DataOutputStream; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.HashMap; @@ -31,80 +30,25 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.ipc.AsyncCallLimitExceededException; import org.apache.hadoop.security.UserGroupInformation; -import org.junit.After; -import org.junit.Before; import org.junit.Test; public class TestAsyncDFSRename { - final Path asyncRenameDir = new Path("/test/async_rename/"); public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); - final private static Configuration CONF = new HdfsConfiguration(); - - final private static String GROUP1_NAME = "group1"; - final private static String GROUP2_NAME = "group2"; - final private static String USER1_NAME = "user1"; - private static final UserGroupInformation USER1; - - private MiniDFSCluster gCluster; - - static { - // explicitly turn on permission checking - CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); - - // create fake mapping for the groups - Map u2g_map = new HashMap(1); - u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME }); - DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map); - - // Initiate all four users - USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] { - GROUP1_NAME, GROUP2_NAME }); - } - - @Before - public void setUp() throws IOException { - gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build(); - gCluster.waitActive(); - } - - @After - public void tearDown() throws IOException { - if (gCluster != null) { - gCluster.shutdown(); - gCluster = null; - } - } - - static int countLease(MiniDFSCluster cluster) { - return TestDFSRename.countLease(cluster); - } - - void list(DistributedFileSystem dfs, String name) throws IOException { - FileSystem.LOG.info("\n\n" + name); - for (FileStatus s : dfs.listStatus(asyncRenameDir)) { - FileSystem.LOG.info("" + s.getPath()); - } - } - - static void createFile(DistributedFileSystem dfs, Path f) throws IOException { - DataOutputStream a_out = dfs.create(f); - a_out.writeBytes("something"); - a_out.close(); - } /** * Check the blocks of dst file are cleaned after rename with overwrite * Restart NN to check the rename successfully */ - @Test + @Test(timeout = 60000) public void testAsyncRenameWithOverwrite() throws Exception { final short replFactor = 2; final long blockSize = 512; @@ -169,26 +113,26 @@ public class TestAsyncDFSRename { } } - @Test - public void testConcurrentAsyncRenameWithOverwrite() throws Exception { + @Test(timeout = 60000) + public void testCallGetReturnValueMultipleTimes() throws Exception { final short replFactor = 2; final long blockSize = 512; final Path renameDir = new Path( - "/test/concurrent_reanme_with_overwrite_dir/"); - Configuration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) - .build(); + "/test/testCallGetReturnValueMultipleTimes/"); + final Configuration conf = new HdfsConfiguration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2).build(); cluster.waitActive(); - DistributedFileSystem dfs = cluster.getFileSystem(); - AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); - int count = 1000; + final DistributedFileSystem dfs = cluster.getFileSystem(); + final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); + final int count = 100; + long fileLen = blockSize * 3; + final Map> returnFutures = new HashMap>(); + + assertTrue(dfs.mkdirs(renameDir)); try { - long fileLen = blockSize * 3; - assertTrue(dfs.mkdirs(renameDir)); - - Map> returnFutures = new HashMap>(); - // concurrently invoking many rename for (int i = 0; i < count; i++) { Path src = new Path(renameDir, "src" + i); @@ -199,8 +143,104 @@ public class TestAsyncDFSRename { returnFutures.put(i, returnFuture); } - // wait for completing the calls + for (int i = 0; i < 5; i++) { + verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster, + renameDir, dfs); + } + } finally { + if (dfs != null) { + dfs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void verifyCallGetReturnValueMultipleTimes( + Map> returnFutures, int count, + MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs) + throws InterruptedException, ExecutionException, IOException { + // wait for completing the calls + for (int i = 0; i < count; i++) { + returnFutures.get(i).get(); + } + + // Restart NN and check the rename successfully + cluster.restartNameNodes(); + + // very the src dir should not exist, dst should + for (int i = 0; i < count; i++) { + Path src = new Path(renameDir, "src" + i); + Path dst = new Path(renameDir, "dst" + i); + assertFalse(dfs.exists(src)); + assertTrue(dfs.exists(dst)); + } + } + + @Test(timeout = 120000) + public void testAggressiveConcurrentAsyncRenameWithOverwrite() + throws Exception { + internalTestConcurrentAsyncRenameWithOverwrite(100, + "testAggressiveConcurrentAsyncRenameWithOverwrite"); + } + + @Test(timeout = 60000) + public void testConservativeConcurrentAsyncRenameWithOverwrite() + throws Exception { + internalTestConcurrentAsyncRenameWithOverwrite(10000, + "testConservativeConcurrentAsyncRenameWithOverwrite"); + } + + private void internalTestConcurrentAsyncRenameWithOverwrite( + final int asyncCallLimit, final String basePath) throws Exception { + final short replFactor = 2; + final long blockSize = 512; + final Path renameDir = new Path(String.format("/test/%s/", basePath)); + Configuration conf = new HdfsConfiguration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, + asyncCallLimit); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .build(); + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); + int count = 1000; + long fileLen = blockSize * 3; + int start = 0, end = 0; + Map> returnFutures = new HashMap>(); + + assertTrue(dfs.mkdirs(renameDir)); + + try { + // concurrently invoking many rename for (int i = 0; i < count; i++) { + Path src = new Path(renameDir, "src" + i); + Path dst = new Path(renameDir, "dst" + i); + DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1); + for (;;) { + try { + LOG.info("rename #" + i); + Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); + returnFutures.put(i, returnFuture); + break; + } catch (AsyncCallLimitExceededException e) { + /** + * reached limit of async calls, fetch results of finished async + * calls to let follow-on calls go + */ + LOG.error(e); + start = end; + end = i; + LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i)); + waitForReturnValues(returnFutures, start, end); + } + } + } + + // wait for completing the calls + for (int i = start; i < count; i++) { returnFutures.get(i).get(); } @@ -215,26 +255,60 @@ public class TestAsyncDFSRename { assertTrue(dfs.exists(dst)); } } finally { - dfs.delete(renameDir, true); + if (dfs != null) { + dfs.close(); + } if (cluster != null) { cluster.shutdown(); } } } - @Test + private void waitForReturnValues( + final Map> returnFutures, final int start, + final int end) throws InterruptedException, ExecutionException { + LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end)); + for (int i = start; i < end; i++) { + LOG.info("calling Future#get #" + i); + returnFutures.get(i).get(); + } + } + + @Test(timeout = 60000) public void testAsyncRenameWithException() throws Exception { - FileSystem rootFs = FileSystem.get(CONF); + Configuration conf = new HdfsConfiguration(); + String group1 = "group1"; + String group2 = "group2"; + String user1 = "user1"; + UserGroupInformation ugi1; + + // explicitly turn on permission checking + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + + // create fake mapping for the groups + Map u2g_map = new HashMap(1); + u2g_map.put(user1, new String[] { group1, group2 }); + DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); + + // Initiate all four users + ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] { + group1, group2 }); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3).build(); + cluster.waitActive(); + + FileSystem rootFs = FileSystem.get(conf); final Path renameDir = new Path("/test/async_rename_exception/"); final Path src = new Path(renameDir, "src"); final Path dst = new Path(renameDir, "dst"); rootFs.mkdirs(src); - AsyncDistributedFileSystem adfs = USER1 + AsyncDistributedFileSystem adfs = ugi1 .doAs(new PrivilegedExceptionAction() { @Override public AsyncDistributedFileSystem run() throws Exception { - return gCluster.getFileSystem().getAsyncDistributedFileSystem(); + return cluster.getFileSystem().getAsyncDistributedFileSystem(); } }); @@ -242,16 +316,24 @@ public class TestAsyncDFSRename { Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); returnFuture.get(); } catch (ExecutionException e) { - checkPermissionDenied(e, src); + checkPermissionDenied(e, src, user1); + } finally { + if (rootFs != null) { + rootFs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } } } - private void checkPermissionDenied(final Exception e, final Path dir) { + private void checkPermissionDenied(final Exception e, final Path dir, + final String user) { assertTrue(e.getCause() instanceof ExecutionException); assertTrue("Permission denied messages must carry AccessControlException", e.getMessage().contains("AccessControlException")); assertTrue("Permission denied messages must carry the username", e - .getMessage().contains(USER1_NAME)); + .getMessage().contains(user)); assertTrue("Permission denied messages must carry the path parent", e .getMessage().contains(dir.getParent().toUri().getPath())); }