Revert "HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou"

This reverts commit 1b9f18623a.
This commit is contained in:
Andrew Wang 2016-06-03 18:09:18 -07:00
parent f23d5dfc60
commit 4d36b221a2
6 changed files with 113 additions and 449 deletions

View File

@ -324,9 +324,6 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT = public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
4*60*60; // 4 hours 4*60*60; // 4 hours
public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
"ipc.client.async.calls.max";
public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed"; public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;

View File

@ -1,36 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
/**
* Signals that an AsyncCallLimitExceededException has occurred. This class is
* used to make application code using async RPC aware that limit of max async
* calls is reached, application code need to retrieve results from response of
* established async calls to avoid buffer overflow in order for follow-on async
* calls going correctly.
*/
public class AsyncCallLimitExceededException extends IOException {
private static final long serialVersionUID = 1L;
public AsyncCallLimitExceededException(String message) {
super(message);
}
}

View File

@ -159,8 +159,6 @@ public class Client implements AutoCloseable {
private final boolean fallbackAllowed; private final boolean fallbackAllowed;
private final byte[] clientId; private final byte[] clientId;
private final int maxAsyncCalls;
private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
/** /**
* Executor on which IPC calls' parameters are sent. * Executor on which IPC calls' parameters are sent.
@ -1290,9 +1288,6 @@ public class Client implements AutoCloseable {
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = ClientId.getClientId(); this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance(); this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
this.maxAsyncCalls = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
} }
/** /**
@ -1359,20 +1354,6 @@ public class Client implements AutoCloseable {
fallbackToSimpleAuth); fallbackToSimpleAuth);
} }
private void checkAsyncCall() throws IOException {
if (isAsynchronousMode()) {
if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
asyncCallCounter.decrementAndGet();
String errMsg = String.format(
"Exceeded limit of max asynchronous calls: %d, " +
"please configure %s to adjust it.",
maxAsyncCalls,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
throw new AsyncCallLimitExceededException(errMsg);
}
}
}
/** /**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response. * <code>remoteId</code>, returning the rpc response.
@ -1393,9 +1374,6 @@ public class Client implements AutoCloseable {
final Call call = createCall(rpcKind, rpcRequest); final Call call = createCall(rpcKind, rpcRequest);
final Connection connection = getConnection(remoteId, call, serviceClass, final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth); fallbackToSimpleAuth);
try {
checkAsyncCall();
try { try {
connection.sendRpcRequest(call); // send the rpc request connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
@ -1405,26 +1383,15 @@ public class Client implements AutoCloseable {
LOG.warn("interrupted waiting to send rpc request to server", e); LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e); throw new IOException(e);
} }
} catch(Exception e) {
if (isAsynchronousMode()) {
releaseAsyncCall();
}
throw e;
}
if (isAsynchronousMode()) { if (isAsynchronousMode()) {
Future<Writable> returnFuture = new AbstractFuture<Writable>() { Future<Writable> returnFuture = new AbstractFuture<Writable>() {
private final AtomicBoolean callled = new AtomicBoolean(false);
@Override @Override
public Writable get() throws InterruptedException, ExecutionException { public Writable get() throws InterruptedException, ExecutionException {
if (callled.compareAndSet(false, true)) {
try { try {
set(getRpcResponse(call, connection)); set(getRpcResponse(call, connection));
} catch (IOException ie) { } catch (IOException ie) {
setException(ie); setException(ie);
} finally {
releaseAsyncCall();
}
} }
return super.get(); return super.get();
} }
@ -1460,15 +1427,6 @@ public class Client implements AutoCloseable {
asynchronousMode.set(async); asynchronousMode.set(async);
} }
private void releaseAsyncCall() {
asyncCallCounter.decrementAndGet();
}
@VisibleForTesting
int getAsyncCallCount() {
return asyncCallCounter.get();
}
private Writable getRpcResponse(final Call call, final Connection connection) private Writable getRpcResponse(final Call call, final Connection connection)
throws IOException { throws IOException {
synchronized (call) { synchronized (call) {

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -35,7 +34,6 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RPC.RpcKind;
@ -56,13 +54,12 @@ public class TestAsyncIPC {
@Before @Before
public void setupConf() { public void setupConf() {
conf = new Configuration(); conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL); Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
// set asynchronous mode for main thread // set asynchronous mode for main thread
Client.setAsynchronousMode(true); Client.setAsynchronousMode(true);
} }
static class AsyncCaller extends Thread { protected static class SerialCaller extends Thread {
private Client client; private Client client;
private InetSocketAddress server; private InetSocketAddress server;
private int count; private int count;
@ -71,11 +68,11 @@ public class TestAsyncIPC {
new HashMap<Integer, Future<LongWritable>>(); new HashMap<Integer, Future<LongWritable>>();
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>(); Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
public AsyncCaller(Client client, InetSocketAddress server, int count) { public SerialCaller(Client client, InetSocketAddress server, int count) {
this.client = client; this.client = client;
this.server = server; this.server = server;
this.count = count; this.count = count;
// set asynchronous mode, since AsyncCaller extends Thread // set asynchronous mode, since SerialCaller extends Thread
Client.setAsynchronousMode(true); Client.setAsynchronousMode(true);
} }
@ -110,111 +107,14 @@ public class TestAsyncIPC {
} }
} }
static class AsyncLimitlCaller extends Thread { @Test
private Client client; public void testSerial() throws IOException, InterruptedException,
private InetSocketAddress server;
private int count;
private boolean failed;
Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
int start = 0, end = 0;
int getStart() {
return start;
}
int getEnd() {
return end;
}
int getCount() {
return count;
}
public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
this(0, client, server, count);
}
final int callerId;
public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
int count) {
this.client = client;
this.server = server;
this.count = count;
// set asynchronous mode, since AsyncLimitlCaller extends Thread
Client.setAsynchronousMode(true);
this.callerId = callerId;
}
@Override
public void run() {
// in case Thread#Start is called, which will spawn new thread
Client.setAsynchronousMode(true);
for (int i = 0; i < count; i++) {
try {
final long param = TestIPC.RANDOM.nextLong();
runCall(i, param);
} catch (Exception e) {
LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
StringUtils.stringifyException(e)));
failed = true;
}
}
}
private void runCall(final int idx, final long param)
throws InterruptedException, ExecutionException, IOException {
for (;;) {
try {
doCall(idx, param);
return;
} catch (AsyncCallLimitExceededException e) {
/**
* reached limit of async calls, fetch results of finished async calls
* to let follow-on calls go
*/
start = end;
end = idx;
waitForReturnValues(start, end);
}
}
}
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
returnFutures.put(idx, returnFuture);
expectedValues.put(idx, param);
}
private void waitForReturnValues(final int start, final int end)
throws InterruptedException, ExecutionException {
for (int i = start; i < end; i++) {
LongWritable value = returnFutures.get(i).get();
if (expectedValues.get(i) != value.get()) {
LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
failed = true;
break;
}
}
}
}
@Test(timeout = 60000)
public void testAsyncCall() throws IOException, InterruptedException,
ExecutionException { ExecutionException {
internalTestAsyncCall(3, false, 2, 5, 100); internalTestSerial(3, false, 2, 5, 100);
internalTestAsyncCall(3, true, 2, 5, 10); internalTestSerial(3, true, 2, 5, 10);
} }
@Test(timeout = 60000) public void internalTestSerial(int handlerCount, boolean handlerSleep,
public void testAsyncCallLimit() throws IOException,
InterruptedException, ExecutionException {
internalTestAsyncCallLimit(100, false, 5, 10, 500);
}
public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException, int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException { InterruptedException, ExecutionException {
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf); Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@ -226,9 +126,9 @@ public class TestAsyncIPC {
clients[i] = new Client(LongWritable.class, conf); clients[i] = new Client(LongWritable.class, conf);
} }
AsyncCaller[] callers = new AsyncCaller[callerCount]; SerialCaller[] callers = new SerialCaller[callerCount];
for (int i = 0; i < callerCount; i++) { for (int i = 0; i < callerCount; i++) {
callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount); callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
callers[i].start(); callers[i].start();
} }
for (int i = 0; i < callerCount; i++) { for (int i = 0; i < callerCount; i++) {
@ -244,75 +144,6 @@ public class TestAsyncIPC {
server.stop(); server.stop();
} }
@Test(timeout = 60000)
public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
InterruptedException, ExecutionException {
int handlerCount = 10, callCount = 100;
Server server = new TestIPC.TestServer(handlerCount, false, conf);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final Client client = new Client(LongWritable.class, conf);
int asyncCallCount = client.getAsyncCallCount();
try {
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
caller.run();
caller.waitForReturnValues();
String msg = String.format(
"First time, expected not failed for caller: %s.", caller);
assertFalse(msg, caller.failed);
caller.waitForReturnValues();
assertTrue(asyncCallCount == client.getAsyncCallCount());
msg = String.format("Second time, expected not failed for caller: %s.",
caller);
assertFalse(msg, caller.failed);
assertTrue(asyncCallCount == client.getAsyncCallCount());
} finally {
client.stop();
server.stop();
}
}
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
Client[] clients = new Client[clientCount];
for (int i = 0; i < clientCount; i++) {
clients[i] = new Client(LongWritable.class, conf);
}
AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
callers[i].join();
callers[i].waitForReturnValues(callers[i].getStart(),
callers[i].getCount());
String msg = String.format("Expected not failed for caller-%d: %s.", i,
callers[i]);
assertFalse(msg, callers[i].failed);
}
for (int i = 0; i < clientCount; i++) {
clients[i].stop();
}
server.stop();
}
/** /**
* Test if (1) the rpc server uses the call id/retry provided by the rpc * Test if (1) the rpc server uses the call id/retry provided by the rpc
* client, and (2) the rpc client receives the same call id/retry from the rpc * client, and (2) the rpc client receives the same call id/retry from the rpc
@ -365,7 +196,7 @@ public class TestAsyncIPC {
try { try {
InetSocketAddress addr = NetUtils.getConnectAddress(server); InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start(); server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 4); final SerialCaller caller = new SerialCaller(client, addr, 4);
caller.run(); caller.run();
caller.waitForReturnValues(); caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller); String msg = String.format("Expected not failed for caller: %s.", caller);
@ -404,7 +235,7 @@ public class TestAsyncIPC {
try { try {
InetSocketAddress addr = NetUtils.getConnectAddress(server); InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start(); server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10); final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run(); caller.run();
caller.waitForReturnValues(); caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller); String msg = String.format("Expected not failed for caller: %s.", caller);
@ -441,7 +272,7 @@ public class TestAsyncIPC {
try { try {
InetSocketAddress addr = NetUtils.getConnectAddress(server); InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start(); server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10); final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run(); caller.run();
caller.waitForReturnValues(); caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller); String msg = String.format("Expected not failed for caller: %s.", caller);
@ -482,9 +313,9 @@ public class TestAsyncIPC {
try { try {
InetSocketAddress addr = NetUtils.getConnectAddress(server); InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start(); server.start();
AsyncCaller[] callers = new AsyncCaller[callerCount]; SerialCaller[] callers = new SerialCaller[callerCount];
for (int i = 0; i < callerCount; ++i) { for (int i = 0; i < callerCount; ++i) {
callers[i] = new AsyncCaller(client, addr, perCallerCallCount); callers[i] = new SerialCaller(client, addr, perCallerCallCount);
callers[i].start(); callers[i].start();
} }
for (int i = 0; i < callerCount; ++i) { for (int i = 0; i < callerCount; ++i) {

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
@ -51,15 +50,12 @@ public class AsyncDistributedFileSystem {
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
.getReturnValueCallback(); .getReturnValueCallback();
Future<T> returnFuture = new AbstractFuture<T>() { Future<T> returnFuture = new AbstractFuture<T>() {
private final AtomicBoolean called = new AtomicBoolean(false);
public T get() throws InterruptedException, ExecutionException { public T get() throws InterruptedException, ExecutionException {
if (called.compareAndSet(false, true)) {
try { try {
set(returnValueCallback.call()); set(returnValueCallback.call());
} catch (Exception e) { } catch (Exception e) {
setException(e); setException(e);
} }
}
return super.get(); return super.get();
} }
}; };

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.HashMap; import java.util.HashMap;
@ -30,25 +31,80 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestAsyncDFSRename { public class TestAsyncDFSRename {
final Path asyncRenameDir = new Path("/test/async_rename/");
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
final private static Configuration CONF = new HdfsConfiguration();
final private static String GROUP1_NAME = "group1";
final private static String GROUP2_NAME = "group2";
final private static String USER1_NAME = "user1";
private static final UserGroupInformation USER1;
private MiniDFSCluster gCluster;
static {
// explicitly turn on permission checking
CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// create fake mapping for the groups
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
// Initiate all four users
USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
GROUP1_NAME, GROUP2_NAME });
}
@Before
public void setUp() throws IOException {
gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
gCluster.waitActive();
}
@After
public void tearDown() throws IOException {
if (gCluster != null) {
gCluster.shutdown();
gCluster = null;
}
}
static int countLease(MiniDFSCluster cluster) {
return TestDFSRename.countLease(cluster);
}
void list(DistributedFileSystem dfs, String name) throws IOException {
FileSystem.LOG.info("\n\n" + name);
for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
FileSystem.LOG.info("" + s.getPath());
}
}
static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
DataOutputStream a_out = dfs.create(f);
a_out.writeBytes("something");
a_out.close();
}
/** /**
* Check the blocks of dst file are cleaned after rename with overwrite * Check the blocks of dst file are cleaned after rename with overwrite
* Restart NN to check the rename successfully * Restart NN to check the rename successfully
*/ */
@Test(timeout = 60000) @Test
public void testAsyncRenameWithOverwrite() throws Exception { public void testAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2; final short replFactor = 2;
final long blockSize = 512; final long blockSize = 512;
@ -113,134 +169,38 @@ public class TestAsyncDFSRename {
} }
} }
@Test(timeout = 60000) @Test
public void testCallGetReturnValueMultipleTimes() throws Exception { public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2; final short replFactor = 2;
final long blockSize = 512; final long blockSize = 512;
final Path renameDir = new Path( final Path renameDir = new Path(
"/test/testCallGetReturnValueMultipleTimes/"); "/test/concurrent_reanme_with_overwrite_dir/");
final Configuration conf = new HdfsConfiguration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
final int count = 100;
long fileLen = blockSize * 3;
final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
assertTrue(dfs.mkdirs(renameDir));
try {
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFutures.put(i, returnFuture);
}
for (int i = 0; i < 5; i++) {
verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
renameDir, dfs);
}
} finally {
if (dfs != null) {
dfs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
private void verifyCallGetReturnValueMultipleTimes(
Map<Integer, Future<Void>> returnFutures, int count,
MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
throws InterruptedException, ExecutionException, IOException {
// wait for completing the calls
for (int i = 0; i < count; i++) {
returnFutures.get(i).get();
}
// Restart NN and check the rename successfully
cluster.restartNameNodes();
// very the src dir should not exist, dst should
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
assertFalse(dfs.exists(src));
assertTrue(dfs.exists(dst));
}
}
@Test(timeout = 120000)
public void testAggressiveConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(100,
"testAggressiveConcurrentAsyncRenameWithOverwrite");
}
@Test(timeout = 60000)
public void testConservativeConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(10000,
"testConservativeConcurrentAsyncRenameWithOverwrite");
}
private void internalTestConcurrentAsyncRenameWithOverwrite(
final int asyncCallLimit, final String basePath) throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(String.format("/test/%s/", basePath));
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
asyncCallLimit);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build(); .build();
cluster.waitActive(); cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem(); DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000; int count = 1000;
long fileLen = blockSize * 3;
int start = 0, end = 0;
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
assertTrue(dfs.mkdirs(renameDir));
try { try {
long fileLen = blockSize * 3;
assertTrue(dfs.mkdirs(renameDir));
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
// concurrently invoking many rename // concurrently invoking many rename
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i); Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i); Path dst = new Path(renameDir, "dst" + i);
DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1); DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1); DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
for (;;) {
try {
LOG.info("rename #" + i);
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFutures.put(i, returnFuture); returnFutures.put(i, returnFuture);
break;
} catch (AsyncCallLimitExceededException e) {
/**
* reached limit of async calls, fetch results of finished async
* calls to let follow-on calls go
*/
LOG.error(e);
start = end;
end = i;
LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
waitForReturnValues(returnFutures, start, end);
}
}
} }
// wait for completing the calls // wait for completing the calls
for (int i = start; i < count; i++) { for (int i = 0; i < count; i++) {
returnFutures.get(i).get(); returnFutures.get(i).get();
} }
@ -255,60 +215,26 @@ public class TestAsyncDFSRename {
assertTrue(dfs.exists(dst)); assertTrue(dfs.exists(dst));
} }
} finally { } finally {
if (dfs != null) { dfs.delete(renameDir, true);
dfs.close();
}
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
} }
} }
private void waitForReturnValues( @Test
final Map<Integer, Future<Void>> returnFutures, final int start,
final int end) throws InterruptedException, ExecutionException {
LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
for (int i = start; i < end; i++) {
LOG.info("calling Future#get #" + i);
returnFutures.get(i).get();
}
}
@Test(timeout = 60000)
public void testAsyncRenameWithException() throws Exception { public void testAsyncRenameWithException() throws Exception {
Configuration conf = new HdfsConfiguration(); FileSystem rootFs = FileSystem.get(CONF);
String group1 = "group1";
String group2 = "group2";
String user1 = "user1";
UserGroupInformation ugi1;
// explicitly turn on permission checking
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// create fake mapping for the groups
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
u2g_map.put(user1, new String[] { group1, group2 });
DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
// Initiate all four users
ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
group1, group2 });
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3).build();
cluster.waitActive();
FileSystem rootFs = FileSystem.get(conf);
final Path renameDir = new Path("/test/async_rename_exception/"); final Path renameDir = new Path("/test/async_rename_exception/");
final Path src = new Path(renameDir, "src"); final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst"); final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src); rootFs.mkdirs(src);
AsyncDistributedFileSystem adfs = ugi1 AsyncDistributedFileSystem adfs = USER1
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() { .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
@Override @Override
public AsyncDistributedFileSystem run() throws Exception { public AsyncDistributedFileSystem run() throws Exception {
return cluster.getFileSystem().getAsyncDistributedFileSystem(); return gCluster.getFileSystem().getAsyncDistributedFileSystem();
} }
}); });
@ -316,24 +242,16 @@ public class TestAsyncDFSRename {
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFuture.get(); returnFuture.get();
} catch (ExecutionException e) { } catch (ExecutionException e) {
checkPermissionDenied(e, src, user1); checkPermissionDenied(e, src);
} finally {
if (rootFs != null) {
rootFs.close();
}
if (cluster != null) {
cluster.shutdown();
}
} }
} }
private void checkPermissionDenied(final Exception e, final Path dir, private void checkPermissionDenied(final Exception e, final Path dir) {
final String user) {
assertTrue(e.getCause() instanceof ExecutionException); assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException", assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException")); e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e assertTrue("Permission denied messages must carry the username", e
.getMessage().contains(user)); .getMessage().contains(USER1_NAME));
assertTrue("Permission denied messages must carry the path parent", e assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(dir.getParent().toUri().getPath())); .getMessage().contains(dir.getParent().toUri().getPath()));
} }