Revert "Revert "HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou""
This reverts commit 4d36b221a2
.
This commit is contained in:
parent
eded3d109e
commit
aa20fa150d
|
@ -324,6 +324,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
||||||
public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
|
public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
|
||||||
4*60*60; // 4 hours
|
4*60*60; // 4 hours
|
||||||
|
|
||||||
|
public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
|
||||||
|
"ipc.client.async.calls.max";
|
||||||
|
public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
|
||||||
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
|
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
|
||||||
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
|
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signals that an AsyncCallLimitExceededException has occurred. This class is
|
||||||
|
* used to make application code using async RPC aware that limit of max async
|
||||||
|
* calls is reached, application code need to retrieve results from response of
|
||||||
|
* established async calls to avoid buffer overflow in order for follow-on async
|
||||||
|
* calls going correctly.
|
||||||
|
*/
|
||||||
|
public class AsyncCallLimitExceededException extends IOException {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public AsyncCallLimitExceededException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -159,6 +159,8 @@ public class Client implements AutoCloseable {
|
||||||
|
|
||||||
private final boolean fallbackAllowed;
|
private final boolean fallbackAllowed;
|
||||||
private final byte[] clientId;
|
private final byte[] clientId;
|
||||||
|
private final int maxAsyncCalls;
|
||||||
|
private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executor on which IPC calls' parameters are sent.
|
* Executor on which IPC calls' parameters are sent.
|
||||||
|
@ -1288,6 +1290,9 @@ public class Client implements AutoCloseable {
|
||||||
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
||||||
this.clientId = ClientId.getClientId();
|
this.clientId = ClientId.getClientId();
|
||||||
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
|
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
|
||||||
|
this.maxAsyncCalls = conf.getInt(
|
||||||
|
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
|
||||||
|
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1354,6 +1359,20 @@ public class Client implements AutoCloseable {
|
||||||
fallbackToSimpleAuth);
|
fallbackToSimpleAuth);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkAsyncCall() throws IOException {
|
||||||
|
if (isAsynchronousMode()) {
|
||||||
|
if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
|
||||||
|
asyncCallCounter.decrementAndGet();
|
||||||
|
String errMsg = String.format(
|
||||||
|
"Exceeded limit of max asynchronous calls: %d, " +
|
||||||
|
"please configure %s to adjust it.",
|
||||||
|
maxAsyncCalls,
|
||||||
|
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
|
||||||
|
throw new AsyncCallLimitExceededException(errMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
|
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
|
||||||
* <code>remoteId</code>, returning the rpc response.
|
* <code>remoteId</code>, returning the rpc response.
|
||||||
|
@ -1374,6 +1393,9 @@ public class Client implements AutoCloseable {
|
||||||
final Call call = createCall(rpcKind, rpcRequest);
|
final Call call = createCall(rpcKind, rpcRequest);
|
||||||
final Connection connection = getConnection(remoteId, call, serviceClass,
|
final Connection connection = getConnection(remoteId, call, serviceClass,
|
||||||
fallbackToSimpleAuth);
|
fallbackToSimpleAuth);
|
||||||
|
|
||||||
|
try {
|
||||||
|
checkAsyncCall();
|
||||||
try {
|
try {
|
||||||
connection.sendRpcRequest(call); // send the rpc request
|
connection.sendRpcRequest(call); // send the rpc request
|
||||||
} catch (RejectedExecutionException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
|
@ -1383,15 +1405,26 @@ public class Client implements AutoCloseable {
|
||||||
LOG.warn("interrupted waiting to send rpc request to server", e);
|
LOG.warn("interrupted waiting to send rpc request to server", e);
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
|
} catch(Exception e) {
|
||||||
|
if (isAsynchronousMode()) {
|
||||||
|
releaseAsyncCall();
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
if (isAsynchronousMode()) {
|
if (isAsynchronousMode()) {
|
||||||
Future<Writable> returnFuture = new AbstractFuture<Writable>() {
|
Future<Writable> returnFuture = new AbstractFuture<Writable>() {
|
||||||
|
private final AtomicBoolean callled = new AtomicBoolean(false);
|
||||||
@Override
|
@Override
|
||||||
public Writable get() throws InterruptedException, ExecutionException {
|
public Writable get() throws InterruptedException, ExecutionException {
|
||||||
|
if (callled.compareAndSet(false, true)) {
|
||||||
try {
|
try {
|
||||||
set(getRpcResponse(call, connection));
|
set(getRpcResponse(call, connection));
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
setException(ie);
|
setException(ie);
|
||||||
|
} finally {
|
||||||
|
releaseAsyncCall();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return super.get();
|
return super.get();
|
||||||
}
|
}
|
||||||
|
@ -1427,6 +1460,15 @@ public class Client implements AutoCloseable {
|
||||||
asynchronousMode.set(async);
|
asynchronousMode.set(async);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void releaseAsyncCall() {
|
||||||
|
asyncCallCounter.decrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getAsyncCallCount() {
|
||||||
|
return asyncCallCounter.get();
|
||||||
|
}
|
||||||
|
|
||||||
private Writable getRpcResponse(final Call call, final Connection connection)
|
private Writable getRpcResponse(final Call call, final Connection connection)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
synchronized (call) {
|
synchronized (call) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -34,6 +35,7 @@ import java.util.concurrent.Future;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||||
|
@ -54,12 +56,13 @@ public class TestAsyncIPC {
|
||||||
@Before
|
@Before
|
||||||
public void setupConf() {
|
public void setupConf() {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
|
||||||
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
|
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
|
||||||
// set asynchronous mode for main thread
|
// set asynchronous mode for main thread
|
||||||
Client.setAsynchronousMode(true);
|
Client.setAsynchronousMode(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static class SerialCaller extends Thread {
|
static class AsyncCaller extends Thread {
|
||||||
private Client client;
|
private Client client;
|
||||||
private InetSocketAddress server;
|
private InetSocketAddress server;
|
||||||
private int count;
|
private int count;
|
||||||
|
@ -68,11 +71,11 @@ public class TestAsyncIPC {
|
||||||
new HashMap<Integer, Future<LongWritable>>();
|
new HashMap<Integer, Future<LongWritable>>();
|
||||||
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
|
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
|
||||||
|
|
||||||
public SerialCaller(Client client, InetSocketAddress server, int count) {
|
public AsyncCaller(Client client, InetSocketAddress server, int count) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.count = count;
|
this.count = count;
|
||||||
// set asynchronous mode, since SerialCaller extends Thread
|
// set asynchronous mode, since AsyncCaller extends Thread
|
||||||
Client.setAsynchronousMode(true);
|
Client.setAsynchronousMode(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,14 +110,111 @@ public class TestAsyncIPC {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
static class AsyncLimitlCaller extends Thread {
|
||||||
public void testSerial() throws IOException, InterruptedException,
|
private Client client;
|
||||||
ExecutionException {
|
private InetSocketAddress server;
|
||||||
internalTestSerial(3, false, 2, 5, 100);
|
private int count;
|
||||||
internalTestSerial(3, true, 2, 5, 10);
|
private boolean failed;
|
||||||
|
Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
|
||||||
|
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
|
||||||
|
int start = 0, end = 0;
|
||||||
|
|
||||||
|
int getStart() {
|
||||||
|
return start;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void internalTestSerial(int handlerCount, boolean handlerSleep,
|
int getEnd() {
|
||||||
|
return end;
|
||||||
|
}
|
||||||
|
|
||||||
|
int getCount() {
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
|
||||||
|
this(0, client, server, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
final int callerId;
|
||||||
|
|
||||||
|
public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
|
||||||
|
int count) {
|
||||||
|
this.client = client;
|
||||||
|
this.server = server;
|
||||||
|
this.count = count;
|
||||||
|
// set asynchronous mode, since AsyncLimitlCaller extends Thread
|
||||||
|
Client.setAsynchronousMode(true);
|
||||||
|
this.callerId = callerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// in case Thread#Start is called, which will spawn new thread
|
||||||
|
Client.setAsynchronousMode(true);
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
try {
|
||||||
|
final long param = TestIPC.RANDOM.nextLong();
|
||||||
|
runCall(i, param);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
|
||||||
|
StringUtils.stringifyException(e)));
|
||||||
|
failed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runCall(final int idx, final long param)
|
||||||
|
throws InterruptedException, ExecutionException, IOException {
|
||||||
|
for (;;) {
|
||||||
|
try {
|
||||||
|
doCall(idx, param);
|
||||||
|
return;
|
||||||
|
} catch (AsyncCallLimitExceededException e) {
|
||||||
|
/**
|
||||||
|
* reached limit of async calls, fetch results of finished async calls
|
||||||
|
* to let follow-on calls go
|
||||||
|
*/
|
||||||
|
start = end;
|
||||||
|
end = idx;
|
||||||
|
waitForReturnValues(start, end);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doCall(final int idx, final long param) throws IOException {
|
||||||
|
TestIPC.call(client, param, server, conf);
|
||||||
|
Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
|
||||||
|
returnFutures.put(idx, returnFuture);
|
||||||
|
expectedValues.put(idx, param);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForReturnValues(final int start, final int end)
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
for (int i = start; i < end; i++) {
|
||||||
|
LongWritable value = returnFutures.get(i).get();
|
||||||
|
if (expectedValues.get(i) != value.get()) {
|
||||||
|
LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
|
||||||
|
failed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAsyncCall() throws IOException, InterruptedException,
|
||||||
|
ExecutionException {
|
||||||
|
internalTestAsyncCall(3, false, 2, 5, 100);
|
||||||
|
internalTestAsyncCall(3, true, 2, 5, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAsyncCallLimit() throws IOException,
|
||||||
|
InterruptedException, ExecutionException {
|
||||||
|
internalTestAsyncCallLimit(100, false, 5, 10, 500);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
|
||||||
int clientCount, int callerCount, int callCount) throws IOException,
|
int clientCount, int callerCount, int callCount) throws IOException,
|
||||||
InterruptedException, ExecutionException {
|
InterruptedException, ExecutionException {
|
||||||
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
|
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
|
||||||
|
@ -126,9 +226,9 @@ public class TestAsyncIPC {
|
||||||
clients[i] = new Client(LongWritable.class, conf);
|
clients[i] = new Client(LongWritable.class, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
SerialCaller[] callers = new SerialCaller[callerCount];
|
AsyncCaller[] callers = new AsyncCaller[callerCount];
|
||||||
for (int i = 0; i < callerCount; i++) {
|
for (int i = 0; i < callerCount; i++) {
|
||||||
callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
|
callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
|
||||||
callers[i].start();
|
callers[i].start();
|
||||||
}
|
}
|
||||||
for (int i = 0; i < callerCount; i++) {
|
for (int i = 0; i < callerCount; i++) {
|
||||||
|
@ -144,6 +244,75 @@ public class TestAsyncIPC {
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
|
||||||
|
InterruptedException, ExecutionException {
|
||||||
|
int handlerCount = 10, callCount = 100;
|
||||||
|
Server server = new TestIPC.TestServer(handlerCount, false, conf);
|
||||||
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
server.start();
|
||||||
|
final Client client = new Client(LongWritable.class, conf);
|
||||||
|
|
||||||
|
int asyncCallCount = client.getAsyncCallCount();
|
||||||
|
|
||||||
|
try {
|
||||||
|
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
|
||||||
|
caller.run();
|
||||||
|
|
||||||
|
caller.waitForReturnValues();
|
||||||
|
String msg = String.format(
|
||||||
|
"First time, expected not failed for caller: %s.", caller);
|
||||||
|
assertFalse(msg, caller.failed);
|
||||||
|
|
||||||
|
caller.waitForReturnValues();
|
||||||
|
assertTrue(asyncCallCount == client.getAsyncCallCount());
|
||||||
|
msg = String.format("Second time, expected not failed for caller: %s.",
|
||||||
|
caller);
|
||||||
|
assertFalse(msg, caller.failed);
|
||||||
|
|
||||||
|
assertTrue(asyncCallCount == client.getAsyncCallCount());
|
||||||
|
} finally {
|
||||||
|
client.stop();
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
|
||||||
|
int clientCount, int callerCount, int callCount) throws IOException,
|
||||||
|
InterruptedException, ExecutionException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
|
||||||
|
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
|
||||||
|
|
||||||
|
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
|
||||||
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
Client[] clients = new Client[clientCount];
|
||||||
|
for (int i = 0; i < clientCount; i++) {
|
||||||
|
clients[i] = new Client(LongWritable.class, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
|
||||||
|
for (int i = 0; i < callerCount; i++) {
|
||||||
|
callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
|
||||||
|
callCount);
|
||||||
|
callers[i].start();
|
||||||
|
}
|
||||||
|
for (int i = 0; i < callerCount; i++) {
|
||||||
|
callers[i].join();
|
||||||
|
callers[i].waitForReturnValues(callers[i].getStart(),
|
||||||
|
callers[i].getCount());
|
||||||
|
String msg = String.format("Expected not failed for caller-%d: %s.", i,
|
||||||
|
callers[i]);
|
||||||
|
assertFalse(msg, callers[i].failed);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < clientCount; i++) {
|
||||||
|
clients[i].stop();
|
||||||
|
}
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test if (1) the rpc server uses the call id/retry provided by the rpc
|
* Test if (1) the rpc server uses the call id/retry provided by the rpc
|
||||||
* client, and (2) the rpc client receives the same call id/retry from the rpc
|
* client, and (2) the rpc client receives the same call id/retry from the rpc
|
||||||
|
@ -196,7 +365,7 @@ public class TestAsyncIPC {
|
||||||
try {
|
try {
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
server.start();
|
server.start();
|
||||||
final SerialCaller caller = new SerialCaller(client, addr, 4);
|
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
|
||||||
caller.run();
|
caller.run();
|
||||||
caller.waitForReturnValues();
|
caller.waitForReturnValues();
|
||||||
String msg = String.format("Expected not failed for caller: %s.", caller);
|
String msg = String.format("Expected not failed for caller: %s.", caller);
|
||||||
|
@ -235,7 +404,7 @@ public class TestAsyncIPC {
|
||||||
try {
|
try {
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
server.start();
|
server.start();
|
||||||
final SerialCaller caller = new SerialCaller(client, addr, 10);
|
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
||||||
caller.run();
|
caller.run();
|
||||||
caller.waitForReturnValues();
|
caller.waitForReturnValues();
|
||||||
String msg = String.format("Expected not failed for caller: %s.", caller);
|
String msg = String.format("Expected not failed for caller: %s.", caller);
|
||||||
|
@ -272,7 +441,7 @@ public class TestAsyncIPC {
|
||||||
try {
|
try {
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
server.start();
|
server.start();
|
||||||
final SerialCaller caller = new SerialCaller(client, addr, 10);
|
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
||||||
caller.run();
|
caller.run();
|
||||||
caller.waitForReturnValues();
|
caller.waitForReturnValues();
|
||||||
String msg = String.format("Expected not failed for caller: %s.", caller);
|
String msg = String.format("Expected not failed for caller: %s.", caller);
|
||||||
|
@ -313,9 +482,9 @@ public class TestAsyncIPC {
|
||||||
try {
|
try {
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
server.start();
|
server.start();
|
||||||
SerialCaller[] callers = new SerialCaller[callerCount];
|
AsyncCaller[] callers = new AsyncCaller[callerCount];
|
||||||
for (int i = 0; i < callerCount; ++i) {
|
for (int i = 0; i < callerCount; ++i) {
|
||||||
callers[i] = new SerialCaller(client, addr, perCallerCallCount);
|
callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
|
||||||
callers[i].start();
|
callers[i].start();
|
||||||
}
|
}
|
||||||
for (int i = 0; i < callerCount; ++i) {
|
for (int i = 0; i < callerCount; ++i) {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
|
@ -50,12 +51,15 @@ public class AsyncDistributedFileSystem {
|
||||||
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
|
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
|
||||||
.getReturnValueCallback();
|
.getReturnValueCallback();
|
||||||
Future<T> returnFuture = new AbstractFuture<T>() {
|
Future<T> returnFuture = new AbstractFuture<T>() {
|
||||||
|
private final AtomicBoolean called = new AtomicBoolean(false);
|
||||||
public T get() throws InterruptedException, ExecutionException {
|
public T get() throws InterruptedException, ExecutionException {
|
||||||
|
if (called.compareAndSet(false, true)) {
|
||||||
try {
|
try {
|
||||||
set(returnValueCallback.call());
|
set(returnValueCallback.call());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
setException(e);
|
setException(e);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return super.get();
|
return super.get();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -31,80 +30,25 @@ import java.util.concurrent.Future;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestAsyncDFSRename {
|
public class TestAsyncDFSRename {
|
||||||
final Path asyncRenameDir = new Path("/test/async_rename/");
|
|
||||||
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
|
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
|
||||||
final private static Configuration CONF = new HdfsConfiguration();
|
|
||||||
|
|
||||||
final private static String GROUP1_NAME = "group1";
|
|
||||||
final private static String GROUP2_NAME = "group2";
|
|
||||||
final private static String USER1_NAME = "user1";
|
|
||||||
private static final UserGroupInformation USER1;
|
|
||||||
|
|
||||||
private MiniDFSCluster gCluster;
|
|
||||||
|
|
||||||
static {
|
|
||||||
// explicitly turn on permission checking
|
|
||||||
CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
|
|
||||||
|
|
||||||
// create fake mapping for the groups
|
|
||||||
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
|
|
||||||
u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
|
|
||||||
DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
|
|
||||||
|
|
||||||
// Initiate all four users
|
|
||||||
USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
|
|
||||||
GROUP1_NAME, GROUP2_NAME });
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws IOException {
|
|
||||||
gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
|
|
||||||
gCluster.waitActive();
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws IOException {
|
|
||||||
if (gCluster != null) {
|
|
||||||
gCluster.shutdown();
|
|
||||||
gCluster = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int countLease(MiniDFSCluster cluster) {
|
|
||||||
return TestDFSRename.countLease(cluster);
|
|
||||||
}
|
|
||||||
|
|
||||||
void list(DistributedFileSystem dfs, String name) throws IOException {
|
|
||||||
FileSystem.LOG.info("\n\n" + name);
|
|
||||||
for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
|
|
||||||
FileSystem.LOG.info("" + s.getPath());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
|
|
||||||
DataOutputStream a_out = dfs.create(f);
|
|
||||||
a_out.writeBytes("something");
|
|
||||||
a_out.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check the blocks of dst file are cleaned after rename with overwrite
|
* Check the blocks of dst file are cleaned after rename with overwrite
|
||||||
* Restart NN to check the rename successfully
|
* Restart NN to check the rename successfully
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout = 60000)
|
||||||
public void testAsyncRenameWithOverwrite() throws Exception {
|
public void testAsyncRenameWithOverwrite() throws Exception {
|
||||||
final short replFactor = 2;
|
final short replFactor = 2;
|
||||||
final long blockSize = 512;
|
final long blockSize = 512;
|
||||||
|
@ -169,26 +113,26 @@ public class TestAsyncDFSRename {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout = 60000)
|
||||||
public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
|
public void testCallGetReturnValueMultipleTimes() throws Exception {
|
||||||
final short replFactor = 2;
|
final short replFactor = 2;
|
||||||
final long blockSize = 512;
|
final long blockSize = 512;
|
||||||
final Path renameDir = new Path(
|
final Path renameDir = new Path(
|
||||||
"/test/concurrent_reanme_with_overwrite_dir/");
|
"/test/testCallGetReturnValueMultipleTimes/");
|
||||||
Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
|
||||||
.build();
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(2).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
|
final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
|
||||||
int count = 1000;
|
final int count = 100;
|
||||||
|
|
||||||
try {
|
|
||||||
long fileLen = blockSize * 3;
|
long fileLen = blockSize * 3;
|
||||||
|
final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
|
||||||
|
|
||||||
assertTrue(dfs.mkdirs(renameDir));
|
assertTrue(dfs.mkdirs(renameDir));
|
||||||
|
|
||||||
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
|
try {
|
||||||
|
|
||||||
// concurrently invoking many rename
|
// concurrently invoking many rename
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
Path src = new Path(renameDir, "src" + i);
|
Path src = new Path(renameDir, "src" + i);
|
||||||
|
@ -199,6 +143,24 @@ public class TestAsyncDFSRename {
|
||||||
returnFutures.put(i, returnFuture);
|
returnFutures.put(i, returnFuture);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
|
||||||
|
renameDir, dfs);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (dfs != null) {
|
||||||
|
dfs.close();
|
||||||
|
}
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyCallGetReturnValueMultipleTimes(
|
||||||
|
Map<Integer, Future<Void>> returnFutures, int count,
|
||||||
|
MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
|
||||||
|
throws InterruptedException, ExecutionException, IOException {
|
||||||
// wait for completing the calls
|
// wait for completing the calls
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
returnFutures.get(i).get();
|
returnFutures.get(i).get();
|
||||||
|
@ -207,6 +169,84 @@ public class TestAsyncDFSRename {
|
||||||
// Restart NN and check the rename successfully
|
// Restart NN and check the rename successfully
|
||||||
cluster.restartNameNodes();
|
cluster.restartNameNodes();
|
||||||
|
|
||||||
|
// very the src dir should not exist, dst should
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
Path src = new Path(renameDir, "src" + i);
|
||||||
|
Path dst = new Path(renameDir, "dst" + i);
|
||||||
|
assertFalse(dfs.exists(src));
|
||||||
|
assertTrue(dfs.exists(dst));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testAggressiveConcurrentAsyncRenameWithOverwrite()
|
||||||
|
throws Exception {
|
||||||
|
internalTestConcurrentAsyncRenameWithOverwrite(100,
|
||||||
|
"testAggressiveConcurrentAsyncRenameWithOverwrite");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testConservativeConcurrentAsyncRenameWithOverwrite()
|
||||||
|
throws Exception {
|
||||||
|
internalTestConcurrentAsyncRenameWithOverwrite(10000,
|
||||||
|
"testConservativeConcurrentAsyncRenameWithOverwrite");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internalTestConcurrentAsyncRenameWithOverwrite(
|
||||||
|
final int asyncCallLimit, final String basePath) throws Exception {
|
||||||
|
final short replFactor = 2;
|
||||||
|
final long blockSize = 512;
|
||||||
|
final Path renameDir = new Path(String.format("/test/%s/", basePath));
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
|
||||||
|
asyncCallLimit);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
|
||||||
|
int count = 1000;
|
||||||
|
long fileLen = blockSize * 3;
|
||||||
|
int start = 0, end = 0;
|
||||||
|
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
|
||||||
|
|
||||||
|
assertTrue(dfs.mkdirs(renameDir));
|
||||||
|
|
||||||
|
try {
|
||||||
|
// concurrently invoking many rename
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
Path src = new Path(renameDir, "src" + i);
|
||||||
|
Path dst = new Path(renameDir, "dst" + i);
|
||||||
|
DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
|
||||||
|
DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
|
||||||
|
for (;;) {
|
||||||
|
try {
|
||||||
|
LOG.info("rename #" + i);
|
||||||
|
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
|
||||||
|
returnFutures.put(i, returnFuture);
|
||||||
|
break;
|
||||||
|
} catch (AsyncCallLimitExceededException e) {
|
||||||
|
/**
|
||||||
|
* reached limit of async calls, fetch results of finished async
|
||||||
|
* calls to let follow-on calls go
|
||||||
|
*/
|
||||||
|
LOG.error(e);
|
||||||
|
start = end;
|
||||||
|
end = i;
|
||||||
|
LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
|
||||||
|
waitForReturnValues(returnFutures, start, end);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for completing the calls
|
||||||
|
for (int i = start; i < count; i++) {
|
||||||
|
returnFutures.get(i).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restart NN and check the rename successfully
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
|
||||||
// very the src dir should not exist, dst should
|
// very the src dir should not exist, dst should
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
Path src = new Path(renameDir, "src" + i);
|
Path src = new Path(renameDir, "src" + i);
|
||||||
|
@ -215,26 +255,60 @@ public class TestAsyncDFSRename {
|
||||||
assertTrue(dfs.exists(dst));
|
assertTrue(dfs.exists(dst));
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
dfs.delete(renameDir, true);
|
if (dfs != null) {
|
||||||
|
dfs.close();
|
||||||
|
}
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
private void waitForReturnValues(
|
||||||
|
final Map<Integer, Future<Void>> returnFutures, final int start,
|
||||||
|
final int end) throws InterruptedException, ExecutionException {
|
||||||
|
LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
|
||||||
|
for (int i = start; i < end; i++) {
|
||||||
|
LOG.info("calling Future#get #" + i);
|
||||||
|
returnFutures.get(i).get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
public void testAsyncRenameWithException() throws Exception {
|
public void testAsyncRenameWithException() throws Exception {
|
||||||
FileSystem rootFs = FileSystem.get(CONF);
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
String group1 = "group1";
|
||||||
|
String group2 = "group2";
|
||||||
|
String user1 = "user1";
|
||||||
|
UserGroupInformation ugi1;
|
||||||
|
|
||||||
|
// explicitly turn on permission checking
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
|
||||||
|
|
||||||
|
// create fake mapping for the groups
|
||||||
|
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
|
||||||
|
u2g_map.put(user1, new String[] { group1, group2 });
|
||||||
|
DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
|
||||||
|
|
||||||
|
// Initiate all four users
|
||||||
|
ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
|
||||||
|
group1, group2 });
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(3).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
FileSystem rootFs = FileSystem.get(conf);
|
||||||
final Path renameDir = new Path("/test/async_rename_exception/");
|
final Path renameDir = new Path("/test/async_rename_exception/");
|
||||||
final Path src = new Path(renameDir, "src");
|
final Path src = new Path(renameDir, "src");
|
||||||
final Path dst = new Path(renameDir, "dst");
|
final Path dst = new Path(renameDir, "dst");
|
||||||
rootFs.mkdirs(src);
|
rootFs.mkdirs(src);
|
||||||
|
|
||||||
AsyncDistributedFileSystem adfs = USER1
|
AsyncDistributedFileSystem adfs = ugi1
|
||||||
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
|
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
|
||||||
@Override
|
@Override
|
||||||
public AsyncDistributedFileSystem run() throws Exception {
|
public AsyncDistributedFileSystem run() throws Exception {
|
||||||
return gCluster.getFileSystem().getAsyncDistributedFileSystem();
|
return cluster.getFileSystem().getAsyncDistributedFileSystem();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -242,16 +316,24 @@ public class TestAsyncDFSRename {
|
||||||
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
|
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
|
||||||
returnFuture.get();
|
returnFuture.get();
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
checkPermissionDenied(e, src);
|
checkPermissionDenied(e, src, user1);
|
||||||
|
} finally {
|
||||||
|
if (rootFs != null) {
|
||||||
|
rootFs.close();
|
||||||
|
}
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkPermissionDenied(final Exception e, final Path dir) {
|
private void checkPermissionDenied(final Exception e, final Path dir,
|
||||||
|
final String user) {
|
||||||
assertTrue(e.getCause() instanceof ExecutionException);
|
assertTrue(e.getCause() instanceof ExecutionException);
|
||||||
assertTrue("Permission denied messages must carry AccessControlException",
|
assertTrue("Permission denied messages must carry AccessControlException",
|
||||||
e.getMessage().contains("AccessControlException"));
|
e.getMessage().contains("AccessControlException"));
|
||||||
assertTrue("Permission denied messages must carry the username", e
|
assertTrue("Permission denied messages must carry the username", e
|
||||||
.getMessage().contains(USER1_NAME));
|
.getMessage().contains(user));
|
||||||
assertTrue("Permission denied messages must carry the path parent", e
|
assertTrue("Permission denied messages must carry the path parent", e
|
||||||
.getMessage().contains(dir.getParent().toUri().getPath()));
|
.getMessage().contains(dir.getParent().toUri().getPath()));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue