From 4d36b221a24e3b626bb91093b0bb0fd377061cae Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Fri, 3 Jun 2016 18:09:18 -0700 Subject: [PATCH] Revert "HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou" This reverts commit 1b9f18623ab55507bea94888317c7d63d0f4a6f2. --- .../hadoop/fs/CommonConfigurationKeys.java | 3 - .../ipc/AsyncCallLimitExceededException.java | 36 --- .../java/org/apache/hadoop/ipc/Client.java | 66 +---- .../org/apache/hadoop/ipc/TestAsyncIPC.java | 199 ++------------ .../hdfs/AsyncDistributedFileSystem.java | 12 +- .../hadoop/hdfs/TestAsyncDFSRename.java | 246 ++++++------------ 6 files changed, 113 insertions(+), 449 deletions(-) delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 06614db5c4c..86e1b431c65 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -324,9 +324,6 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT = 4*60*60; // 4 hours - public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY = - "ipc.client.async.calls.max"; - public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100; public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed"; public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java deleted file mode 100644 index db97b6cb366..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ipc; - -import java.io.IOException; - -/** - * Signals that an AsyncCallLimitExceededException has occurred. This class is - * used to make application code using async RPC aware that limit of max async - * calls is reached, application code need to retrieve results from response of - * established async calls to avoid buffer overflow in order for follow-on async - * calls going correctly. - */ -public class AsyncCallLimitExceededException extends IOException { - private static final long serialVersionUID = 1L; - - public AsyncCallLimitExceededException(String message) { - super(message); - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 9be46493782..d59aeb89955 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -159,9 +159,7 @@ public class Client implements AutoCloseable { private final boolean fallbackAllowed; private final byte[] clientId; - private final int maxAsyncCalls; - private final AtomicInteger asyncCallCounter = new AtomicInteger(0); - + /** * Executor on which IPC calls' parameters are sent. * Deferring the sending of parameters to a separate @@ -1290,9 +1288,6 @@ public class Client implements AutoCloseable { CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.clientId = ClientId.getClientId(); this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); - this.maxAsyncCalls = conf.getInt( - CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, - CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); } /** @@ -1359,20 +1354,6 @@ public class Client implements AutoCloseable { fallbackToSimpleAuth); } - private void checkAsyncCall() throws IOException { - if (isAsynchronousMode()) { - if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) { - asyncCallCounter.decrementAndGet(); - String errMsg = String.format( - "Exceeded limit of max asynchronous calls: %d, " + - "please configure %s to adjust it.", - maxAsyncCalls, - CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY); - throw new AsyncCallLimitExceededException(errMsg); - } - } - } - /** * Make a call, passing rpcRequest, to the IPC server defined by * remoteId, returning the rpc response. @@ -1393,38 +1374,24 @@ public class Client implements AutoCloseable { final Call call = createCall(rpcKind, rpcRequest); final Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); - try { - checkAsyncCall(); - try { - connection.sendRpcRequest(call); // send the rpc request - } catch (RejectedExecutionException e) { - throw new IOException("connection has been closed", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("interrupted waiting to send rpc request to server", e); - throw new IOException(e); - } - } catch(Exception e) { - if (isAsynchronousMode()) { - releaseAsyncCall(); - } - throw e; + connection.sendRpcRequest(call); // send the rpc request + } catch (RejectedExecutionException e) { + throw new IOException("connection has been closed", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("interrupted waiting to send rpc request to server", e); + throw new IOException(e); } if (isAsynchronousMode()) { Future returnFuture = new AbstractFuture() { - private final AtomicBoolean callled = new AtomicBoolean(false); @Override public Writable get() throws InterruptedException, ExecutionException { - if (callled.compareAndSet(false, true)) { - try { - set(getRpcResponse(call, connection)); - } catch (IOException ie) { - setException(ie); - } finally { - releaseAsyncCall(); - } + try { + set(getRpcResponse(call, connection)); + } catch (IOException ie) { + setException(ie); } return super.get(); } @@ -1460,15 +1427,6 @@ public class Client implements AutoCloseable { asynchronousMode.set(async); } - private void releaseAsyncCall() { - asyncCallCounter.decrementAndGet(); - } - - @VisibleForTesting - int getAsyncCallCount() { - return asyncCallCounter.get(); - } - private Writable getRpcResponse(final Call call, final Connection connection) throws IOException { synchronized (call) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java index 8ee3a2c64d3..6cf75c74c7e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java @@ -20,7 +20,6 @@ package org.apache.hadoop.ipc; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.InetSocketAddress; @@ -35,7 +34,6 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RPC.RpcKind; @@ -56,13 +54,12 @@ public class TestAsyncIPC { @Before public void setupConf() { conf = new Configuration(); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000); Client.setPingInterval(conf, TestIPC.PING_INTERVAL); // set asynchronous mode for main thread Client.setAsynchronousMode(true); } - static class AsyncCaller extends Thread { + protected static class SerialCaller extends Thread { private Client client; private InetSocketAddress server; private int count; @@ -71,11 +68,11 @@ public class TestAsyncIPC { new HashMap>(); Map expectedValues = new HashMap(); - public AsyncCaller(Client client, InetSocketAddress server, int count) { + public SerialCaller(Client client, InetSocketAddress server, int count) { this.client = client; this.server = server; this.count = count; - // set asynchronous mode, since AsyncCaller extends Thread + // set asynchronous mode, since SerialCaller extends Thread Client.setAsynchronousMode(true); } @@ -110,111 +107,14 @@ public class TestAsyncIPC { } } - static class AsyncLimitlCaller extends Thread { - private Client client; - private InetSocketAddress server; - private int count; - private boolean failed; - Map> returnFutures = new HashMap>(); - Map expectedValues = new HashMap(); - int start = 0, end = 0; - - int getStart() { - return start; - } - - int getEnd() { - return end; - } - - int getCount() { - return count; - } - - public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) { - this(0, client, server, count); - } - - final int callerId; - - public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server, - int count) { - this.client = client; - this.server = server; - this.count = count; - // set asynchronous mode, since AsyncLimitlCaller extends Thread - Client.setAsynchronousMode(true); - this.callerId = callerId; - } - - @Override - public void run() { - // in case Thread#Start is called, which will spawn new thread - Client.setAsynchronousMode(true); - for (int i = 0; i < count; i++) { - try { - final long param = TestIPC.RANDOM.nextLong(); - runCall(i, param); - } catch (Exception e) { - LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i, - StringUtils.stringifyException(e))); - failed = true; - } - } - } - - private void runCall(final int idx, final long param) - throws InterruptedException, ExecutionException, IOException { - for (;;) { - try { - doCall(idx, param); - return; - } catch (AsyncCallLimitExceededException e) { - /** - * reached limit of async calls, fetch results of finished async calls - * to let follow-on calls go - */ - start = end; - end = idx; - waitForReturnValues(start, end); - } - } - } - - private void doCall(final int idx, final long param) throws IOException { - TestIPC.call(client, param, server, conf); - Future returnFuture = Client.getReturnRpcResponse(); - returnFutures.put(idx, returnFuture); - expectedValues.put(idx, param); - } - - private void waitForReturnValues(final int start, final int end) - throws InterruptedException, ExecutionException { - for (int i = start; i < end; i++) { - LongWritable value = returnFutures.get(i).get(); - if (expectedValues.get(i) != value.get()) { - LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i)); - failed = true; - break; - } - } - } - } - - @Test(timeout = 60000) - public void testAsyncCall() throws IOException, InterruptedException, + @Test + public void testSerial() throws IOException, InterruptedException, ExecutionException { - internalTestAsyncCall(3, false, 2, 5, 100); - internalTestAsyncCall(3, true, 2, 5, 10); + internalTestSerial(3, false, 2, 5, 100); + internalTestSerial(3, true, 2, 5, 10); } - @Test(timeout = 60000) - public void testAsyncCallLimit() throws IOException, - InterruptedException, ExecutionException { - internalTestAsyncCallLimit(100, false, 5, 10, 500); - } - - public void internalTestAsyncCall(int handlerCount, boolean handlerSleep, + public void internalTestSerial(int handlerCount, boolean handlerSleep, int clientCount, int callerCount, int callCount) throws IOException, InterruptedException, ExecutionException { Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf); @@ -226,9 +126,9 @@ public class TestAsyncIPC { clients[i] = new Client(LongWritable.class, conf); } - AsyncCaller[] callers = new AsyncCaller[callerCount]; + SerialCaller[] callers = new SerialCaller[callerCount]; for (int i = 0; i < callerCount; i++) { - callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount); + callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount); callers[i].start(); } for (int i = 0; i < callerCount; i++) { @@ -244,75 +144,6 @@ public class TestAsyncIPC { server.stop(); } - @Test(timeout = 60000) - public void testCallGetReturnRpcResponseMultipleTimes() throws IOException, - InterruptedException, ExecutionException { - int handlerCount = 10, callCount = 100; - Server server = new TestIPC.TestServer(handlerCount, false, conf); - InetSocketAddress addr = NetUtils.getConnectAddress(server); - server.start(); - final Client client = new Client(LongWritable.class, conf); - - int asyncCallCount = client.getAsyncCallCount(); - - try { - AsyncCaller caller = new AsyncCaller(client, addr, callCount); - caller.run(); - - caller.waitForReturnValues(); - String msg = String.format( - "First time, expected not failed for caller: %s.", caller); - assertFalse(msg, caller.failed); - - caller.waitForReturnValues(); - assertTrue(asyncCallCount == client.getAsyncCallCount()); - msg = String.format("Second time, expected not failed for caller: %s.", - caller); - assertFalse(msg, caller.failed); - - assertTrue(asyncCallCount == client.getAsyncCallCount()); - } finally { - client.stop(); - server.stop(); - } - } - - public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep, - int clientCount, int callerCount, int callCount) throws IOException, - InterruptedException, ExecutionException { - Configuration conf = new Configuration(); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100); - Client.setPingInterval(conf, TestIPC.PING_INTERVAL); - - Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf); - InetSocketAddress addr = NetUtils.getConnectAddress(server); - server.start(); - - Client[] clients = new Client[clientCount]; - for (int i = 0; i < clientCount; i++) { - clients[i] = new Client(LongWritable.class, conf); - } - - AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount]; - for (int i = 0; i < callerCount; i++) { - callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr, - callCount); - callers[i].start(); - } - for (int i = 0; i < callerCount; i++) { - callers[i].join(); - callers[i].waitForReturnValues(callers[i].getStart(), - callers[i].getCount()); - String msg = String.format("Expected not failed for caller-%d: %s.", i, - callers[i]); - assertFalse(msg, callers[i].failed); - } - for (int i = 0; i < clientCount; i++) { - clients[i].stop(); - } - server.stop(); - } - /** * Test if (1) the rpc server uses the call id/retry provided by the rpc * client, and (2) the rpc client receives the same call id/retry from the rpc @@ -365,7 +196,7 @@ public class TestAsyncIPC { try { InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); - final AsyncCaller caller = new AsyncCaller(client, addr, 4); + final SerialCaller caller = new SerialCaller(client, addr, 4); caller.run(); caller.waitForReturnValues(); String msg = String.format("Expected not failed for caller: %s.", caller); @@ -404,7 +235,7 @@ public class TestAsyncIPC { try { InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); - final AsyncCaller caller = new AsyncCaller(client, addr, 10); + final SerialCaller caller = new SerialCaller(client, addr, 10); caller.run(); caller.waitForReturnValues(); String msg = String.format("Expected not failed for caller: %s.", caller); @@ -441,7 +272,7 @@ public class TestAsyncIPC { try { InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); - final AsyncCaller caller = new AsyncCaller(client, addr, 10); + final SerialCaller caller = new SerialCaller(client, addr, 10); caller.run(); caller.waitForReturnValues(); String msg = String.format("Expected not failed for caller: %s.", caller); @@ -482,9 +313,9 @@ public class TestAsyncIPC { try { InetSocketAddress addr = NetUtils.getConnectAddress(server); server.start(); - AsyncCaller[] callers = new AsyncCaller[callerCount]; + SerialCaller[] callers = new SerialCaller[callerCount]; for (int i = 0; i < callerCount; ++i) { - callers[i] = new AsyncCaller(client, addr, perCallerCallCount); + callers[i] = new SerialCaller(client, addr, perCallerCallCount); callers[i].start(); } for (int i = 0; i < callerCount; ++i) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java index 356ae3ff566..37899aa7030 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.fs.Options; @@ -51,14 +50,11 @@ public class AsyncDistributedFileSystem { final Callable returnValueCallback = ClientNamenodeProtocolTranslatorPB .getReturnValueCallback(); Future returnFuture = new AbstractFuture() { - private final AtomicBoolean called = new AtomicBoolean(false); public T get() throws InterruptedException, ExecutionException { - if (called.compareAndSet(false, true)) { - try { - set(returnValueCallback.call()); - } catch (Exception e) { - setException(e); - } + try { + set(returnValueCallback.call()); + } catch (Exception e) { + setException(e); } return super.get(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java index d129299bf59..9322e1afbb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.DataOutputStream; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.HashMap; @@ -30,25 +31,80 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; -import org.apache.hadoop.ipc.AsyncCallLimitExceededException; import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; import org.junit.Test; public class TestAsyncDFSRename { + final Path asyncRenameDir = new Path("/test/async_rename/"); public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); + final private static Configuration CONF = new HdfsConfiguration(); + + final private static String GROUP1_NAME = "group1"; + final private static String GROUP2_NAME = "group2"; + final private static String USER1_NAME = "user1"; + private static final UserGroupInformation USER1; + + private MiniDFSCluster gCluster; + + static { + // explicitly turn on permission checking + CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + + // create fake mapping for the groups + Map u2g_map = new HashMap(1); + u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME }); + DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map); + + // Initiate all four users + USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] { + GROUP1_NAME, GROUP2_NAME }); + } + + @Before + public void setUp() throws IOException { + gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build(); + gCluster.waitActive(); + } + + @After + public void tearDown() throws IOException { + if (gCluster != null) { + gCluster.shutdown(); + gCluster = null; + } + } + + static int countLease(MiniDFSCluster cluster) { + return TestDFSRename.countLease(cluster); + } + + void list(DistributedFileSystem dfs, String name) throws IOException { + FileSystem.LOG.info("\n\n" + name); + for (FileStatus s : dfs.listStatus(asyncRenameDir)) { + FileSystem.LOG.info("" + s.getPath()); + } + } + + static void createFile(DistributedFileSystem dfs, Path f) throws IOException { + DataOutputStream a_out = dfs.create(f); + a_out.writeBytes("something"); + a_out.close(); + } /** * Check the blocks of dst file are cleaned after rename with overwrite * Restart NN to check the rename successfully */ - @Test(timeout = 60000) + @Test public void testAsyncRenameWithOverwrite() throws Exception { final short replFactor = 2; final long blockSize = 512; @@ -113,26 +169,26 @@ public class TestAsyncDFSRename { } } - @Test(timeout = 60000) - public void testCallGetReturnValueMultipleTimes() throws Exception { + @Test + public void testConcurrentAsyncRenameWithOverwrite() throws Exception { final short replFactor = 2; final long blockSize = 512; final Path renameDir = new Path( - "/test/testCallGetReturnValueMultipleTimes/"); - final Configuration conf = new HdfsConfiguration(); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(2).build(); + "/test/concurrent_reanme_with_overwrite_dir/"); + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .build(); cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); - final int count = 100; - long fileLen = blockSize * 3; - final Map> returnFutures = new HashMap>(); - - assertTrue(dfs.mkdirs(renameDir)); + DistributedFileSystem dfs = cluster.getFileSystem(); + AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); + int count = 1000; try { + long fileLen = blockSize * 3; + assertTrue(dfs.mkdirs(renameDir)); + + Map> returnFutures = new HashMap>(); + // concurrently invoking many rename for (int i = 0; i < count; i++) { Path src = new Path(renameDir, "src" + i); @@ -143,104 +199,8 @@ public class TestAsyncDFSRename { returnFutures.put(i, returnFuture); } - for (int i = 0; i < 5; i++) { - verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster, - renameDir, dfs); - } - } finally { - if (dfs != null) { - dfs.close(); - } - if (cluster != null) { - cluster.shutdown(); - } - } - } - - private void verifyCallGetReturnValueMultipleTimes( - Map> returnFutures, int count, - MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs) - throws InterruptedException, ExecutionException, IOException { - // wait for completing the calls - for (int i = 0; i < count; i++) { - returnFutures.get(i).get(); - } - - // Restart NN and check the rename successfully - cluster.restartNameNodes(); - - // very the src dir should not exist, dst should - for (int i = 0; i < count; i++) { - Path src = new Path(renameDir, "src" + i); - Path dst = new Path(renameDir, "dst" + i); - assertFalse(dfs.exists(src)); - assertTrue(dfs.exists(dst)); - } - } - - @Test(timeout = 120000) - public void testAggressiveConcurrentAsyncRenameWithOverwrite() - throws Exception { - internalTestConcurrentAsyncRenameWithOverwrite(100, - "testAggressiveConcurrentAsyncRenameWithOverwrite"); - } - - @Test(timeout = 60000) - public void testConservativeConcurrentAsyncRenameWithOverwrite() - throws Exception { - internalTestConcurrentAsyncRenameWithOverwrite(10000, - "testConservativeConcurrentAsyncRenameWithOverwrite"); - } - - private void internalTestConcurrentAsyncRenameWithOverwrite( - final int asyncCallLimit, final String basePath) throws Exception { - final short replFactor = 2; - final long blockSize = 512; - final Path renameDir = new Path(String.format("/test/%s/", basePath)); - Configuration conf = new HdfsConfiguration(); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, - asyncCallLimit); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) - .build(); - cluster.waitActive(); - DistributedFileSystem dfs = cluster.getFileSystem(); - AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); - int count = 1000; - long fileLen = blockSize * 3; - int start = 0, end = 0; - Map> returnFutures = new HashMap>(); - - assertTrue(dfs.mkdirs(renameDir)); - - try { - // concurrently invoking many rename - for (int i = 0; i < count; i++) { - Path src = new Path(renameDir, "src" + i); - Path dst = new Path(renameDir, "dst" + i); - DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1); - for (;;) { - try { - LOG.info("rename #" + i); - Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); - returnFutures.put(i, returnFuture); - break; - } catch (AsyncCallLimitExceededException e) { - /** - * reached limit of async calls, fetch results of finished async - * calls to let follow-on calls go - */ - LOG.error(e); - start = end; - end = i; - LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i)); - waitForReturnValues(returnFutures, start, end); - } - } - } - // wait for completing the calls - for (int i = start; i < count; i++) { + for (int i = 0; i < count; i++) { returnFutures.get(i).get(); } @@ -255,60 +215,26 @@ public class TestAsyncDFSRename { assertTrue(dfs.exists(dst)); } } finally { - if (dfs != null) { - dfs.close(); - } + dfs.delete(renameDir, true); if (cluster != null) { cluster.shutdown(); } } } - private void waitForReturnValues( - final Map> returnFutures, final int start, - final int end) throws InterruptedException, ExecutionException { - LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end)); - for (int i = start; i < end; i++) { - LOG.info("calling Future#get #" + i); - returnFutures.get(i).get(); - } - } - - @Test(timeout = 60000) + @Test public void testAsyncRenameWithException() throws Exception { - Configuration conf = new HdfsConfiguration(); - String group1 = "group1"; - String group2 = "group2"; - String user1 = "user1"; - UserGroupInformation ugi1; - - // explicitly turn on permission checking - conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); - - // create fake mapping for the groups - Map u2g_map = new HashMap(1); - u2g_map.put(user1, new String[] { group1, group2 }); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); - - // Initiate all four users - ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] { - group1, group2 }); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(3).build(); - cluster.waitActive(); - - FileSystem rootFs = FileSystem.get(conf); + FileSystem rootFs = FileSystem.get(CONF); final Path renameDir = new Path("/test/async_rename_exception/"); final Path src = new Path(renameDir, "src"); final Path dst = new Path(renameDir, "dst"); rootFs.mkdirs(src); - AsyncDistributedFileSystem adfs = ugi1 + AsyncDistributedFileSystem adfs = USER1 .doAs(new PrivilegedExceptionAction() { @Override public AsyncDistributedFileSystem run() throws Exception { - return cluster.getFileSystem().getAsyncDistributedFileSystem(); + return gCluster.getFileSystem().getAsyncDistributedFileSystem(); } }); @@ -316,24 +242,16 @@ public class TestAsyncDFSRename { Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); returnFuture.get(); } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); - } finally { - if (rootFs != null) { - rootFs.close(); - } - if (cluster != null) { - cluster.shutdown(); - } + checkPermissionDenied(e, src); } } - private void checkPermissionDenied(final Exception e, final Path dir, - final String user) { + private void checkPermissionDenied(final Exception e, final Path dir) { assertTrue(e.getCause() instanceof ExecutionException); assertTrue("Permission denied messages must carry AccessControlException", e.getMessage().contains("AccessControlException")); assertTrue("Permission denied messages must carry the username", e - .getMessage().contains(user)); + .getMessage().contains(USER1_NAME)); assertTrue("Permission denied messages must carry the path parent", e .getMessage().contains(dir.getParent().toUri().getPath())); }