Revert "Revert "HADOOP-13226 Support async call retry and failover.""

This reverts commit 5360da8bd9.
This commit is contained in:
Tsz-Wo Nicholas Sze 2016-06-06 16:31:43 +08:00
parent 7e7b1ae037
commit 35f255b03b
14 changed files with 775 additions and 114 deletions

View File

@ -345,7 +345,13 @@
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
</Match>
<!-- Synchronization performed on util.concurrent instance. -->
<!-- WA_NOT_IN_LOOP is invalid in util.concurrent.AsyncGet$Util.wait. -->
<Match>
<Class name="org.apache.hadoop.util.concurrent.AsyncGet$Util" />
<Method name="wait" />
<Bug pattern="WA_NOT_IN_LOOP" />
</Match>
<Match>
<Class name="org.apache.hadoop.service.AbstractService" />
<Method name="stop" />

View File

@ -0,0 +1,321 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.retry;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/** Handle async calls. */
@InterfaceAudience.Private
public class AsyncCallHandler {
static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
private static final ThreadLocal<AsyncGet<?, Exception>>
LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
private static final ThreadLocal<AsyncGet<Object, Throwable>>
ASYNC_RETURN = new ThreadLocal<>();
/** @return the async return value from {@link AsyncCallHandler}. */
@InterfaceStability.Unstable
@SuppressWarnings("unchecked")
public static <R, T extends Throwable> AsyncGet<R, T> getAsyncReturn() {
final AsyncGet<R, T> asyncGet = (AsyncGet<R, T>)ASYNC_RETURN.get();
if (asyncGet != null) {
ASYNC_RETURN.set(null);
return asyncGet;
} else {
return (AsyncGet<R, T>) getLowerLayerAsyncReturn();
}
}
/** For the lower rpc layers to set the async return value. */
@InterfaceStability.Unstable
public static void setLowerLayerAsyncReturn(
AsyncGet<?, Exception> asyncReturn) {
LOWER_LAYER_ASYNC_RETURN.set(asyncReturn);
}
private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
final AsyncGet<?, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
Preconditions.checkNotNull(asyncGet);
LOWER_LAYER_ASYNC_RETURN.set(null);
return asyncGet;
}
/** A simple concurrent queue which keeping track the empty start time. */
static class ConcurrentQueue<T> {
private final Queue<T> queue = new LinkedList<>();
private long emptyStartTime = Time.monotonicNow();
synchronized int size() {
return queue.size();
}
/** Is the queue empty for more than the given time in millisecond? */
synchronized boolean isEmpty(long time) {
return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
}
synchronized void offer(T c) {
final boolean added = queue.offer(c);
Preconditions.checkState(added);
}
synchronized T poll() {
Preconditions.checkState(!queue.isEmpty());
final T t = queue.poll();
if (queue.isEmpty()) {
emptyStartTime = Time.monotonicNow();
}
return t;
}
}
/** A queue for handling async calls. */
static class AsyncCallQueue {
private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
private final Processor processor = new Processor();
void addCall(AsyncCall call) {
if (LOG.isDebugEnabled()) {
LOG.debug("add " + call);
}
queue.offer(call);
processor.tryStart();
}
void checkCalls() {
final int size = queue.size();
for (int i = 0; i < size; i++) {
final AsyncCall c = queue.poll();
if (!c.isDone()) {
queue.offer(c); // the call is not done yet, add it back.
}
}
}
/** Process the async calls in the queue. */
private class Processor {
static final long GRACE_PERIOD = 10*1000L;
static final long SLEEP_PERIOD = 100L;
private final AtomicReference<Thread> running = new AtomicReference<>();
boolean isRunning(Daemon d) {
return d == running.get();
}
void tryStart() {
final Thread current = Thread.currentThread();
if (running.compareAndSet(null, current)) {
final Daemon daemon = new Daemon() {
@Override
public void run() {
for (; isRunning(this);) {
try {
Thread.sleep(SLEEP_PERIOD);
} catch (InterruptedException e) {
kill(this);
return;
}
checkCalls();
tryStop(this);
}
}
};
final boolean set = running.compareAndSet(current, daemon);
Preconditions.checkState(set);
if (LOG.isDebugEnabled()) {
LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
}
daemon.start();
}
}
void tryStop(Daemon d) {
if (queue.isEmpty(GRACE_PERIOD)) {
kill(d);
}
}
void kill(Daemon d) {
if (LOG.isDebugEnabled()) {
LOG.debug("Killing " + d);
}
final boolean set = running.compareAndSet(d, null);
Preconditions.checkState(set);
}
}
}
static class AsyncValue<V> {
private V value;
synchronized V waitAsyncValue(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (value != null) {
return value;
}
AsyncGet.Util.wait(this, timeout, unit);
if (value != null) {
return value;
}
throw new TimeoutException("waitCallReturn timed out "
+ timeout + " " + unit);
}
synchronized void set(V v) {
Preconditions.checkNotNull(v);
Preconditions.checkState(value == null);
value = v;
notify();
}
synchronized boolean isDone() {
return value != null;
}
}
static class AsyncCall extends RetryInvocationHandler.Call {
private final AsyncCallHandler asyncCallHandler;
private final AsyncValue<CallReturn> asyncCallReturn = new AsyncValue<>();
private AsyncGet<?, Exception> lowerLayerAsyncGet;
AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
RetryInvocationHandler.Counters counters,
RetryInvocationHandler<?> retryInvocationHandler,
AsyncCallHandler asyncCallHandler) {
super(method, args, isRpc, callId, counters, retryInvocationHandler);
this.asyncCallHandler = asyncCallHandler;
}
/** @return true if the call is done; otherwise, return false. */
boolean isDone() {
final CallReturn r = invokeOnce();
switch (r.getState()) {
case RETURNED:
case EXCEPTION:
asyncCallReturn.set(r); // the async call is done
return true;
case RETRY:
invokeOnce();
break;
case ASYNC_CALL_IN_PROGRESS:
case ASYNC_INVOKED:
// nothing to do
break;
default:
Preconditions.checkState(false);
}
return false;
}
@Override
CallReturn invoke() throws Throwable {
LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
if (lowerLayerAsyncGet != null) {
// async call was submitted early, check the lower level async call
final boolean isDone = lowerLayerAsyncGet.isDone();
LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
if (!isDone) {
return CallReturn.ASYNC_CALL_IN_PROGRESS;
}
try {
return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
} finally {
lowerLayerAsyncGet = null;
}
}
// submit a new async call
LOG.trace("invoke: ASYNC_INVOKED");
final boolean mode = Client.isAsynchronousMode();
try {
Client.setAsynchronousMode(true);
final Object r = invokeMethod();
// invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
Preconditions.checkState(r == null);
lowerLayerAsyncGet = getLowerLayerAsyncReturn();
if (counters.isZeros()) {
// first async attempt, initialize
LOG.trace("invoke: initAsyncCall");
asyncCallHandler.initAsyncCall(this, asyncCallReturn);
}
return CallReturn.ASYNC_INVOKED;
} finally {
Client.setAsynchronousMode(mode);
}
}
}
private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
private volatile boolean hasSuccessfulCall = false;
AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
int callId, RetryInvocationHandler.Counters counters,
RetryInvocationHandler<?> retryInvocationHandler) {
return new AsyncCall(method, args, isRpc, callId, counters,
retryInvocationHandler, this);
}
boolean hasSuccessfulCall() {
return hasSuccessfulCall;
}
private void initAsyncCall(final AsyncCall asyncCall,
final AsyncValue<CallReturn> asyncCallReturn) {
asyncCalls.addCall(asyncCall);
final AsyncGet<Object, Throwable> asyncGet
= new AsyncGet<Object, Throwable>() {
@Override
public Object get(long timeout, TimeUnit unit) throws Throwable {
final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit);
final Object r = c.getReturnValue();
hasSuccessfulCall = true;
return r;
}
@Override
public boolean isDone() {
return asyncCallReturn.isDone();
}
};
ASYNC_RETURN.set(asyncGet);
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.retry;
import com.google.common.base.Preconditions;
/** The call return from a method invocation. */
class CallReturn {
/** The return state. */
enum State {
/** Call is returned successfully. */
RETURNED,
/** Call throws an exception. */
EXCEPTION,
/** Call should be retried according to the {@link RetryPolicy}. */
RETRY,
/** Call, which is async, is still in progress. */
ASYNC_CALL_IN_PROGRESS,
/** Call, which is async, just has been invoked. */
ASYNC_INVOKED
}
static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn(
State.ASYNC_CALL_IN_PROGRESS);
static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
static final CallReturn RETRY = new CallReturn(State.RETRY);
private final Object returnValue;
private final Throwable thrown;
private final State state;
CallReturn(Object r) {
this(r, null, State.RETURNED);
}
CallReturn(Throwable t) {
this(null, t, State.EXCEPTION);
Preconditions.checkNotNull(t);
}
private CallReturn(State s) {
this(null, null, s);
}
private CallReturn(Object r, Throwable t, State s) {
Preconditions.checkArgument(r == null || t == null);
returnValue = r;
thrown = t;
state = s;
}
State getState() {
return state;
}
Object getReturnValue() throws Throwable {
if (state == State.EXCEPTION) {
throw thrown;
}
Preconditions.checkState(state == State.RETURNED, "state == %s", state);
return returnValue;
}
}

View File

@ -42,11 +42,83 @@
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private static class Counters {
static class Call {
private final Method method;
private final Object[] args;
private final boolean isRpc;
private final int callId;
final Counters counters;
private final RetryPolicy retryPolicy;
private final RetryInvocationHandler<?> retryInvocationHandler;
Call(Method method, Object[] args, boolean isRpc, int callId,
Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
this.method = method;
this.args = args;
this.isRpc = isRpc;
this.callId = callId;
this.counters = counters;
this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
this.retryInvocationHandler = retryInvocationHandler;
}
/** Invoke the call once without retrying. */
synchronized CallReturn invokeOnce() {
try {
// The number of times this invocation handler has ever been failed over
// before this method invocation attempt. Used to prevent concurrent
// failed method invocations from triggering multiple failover attempts.
final long failoverCount = retryInvocationHandler.getFailoverCount();
try {
return invoke();
} catch (Exception e) {
if (LOG.isTraceEnabled()) {
LOG.trace(this, e);
}
if (Thread.currentThread().isInterrupted()) {
// If interrupted, do not retry.
throw e;
}
retryInvocationHandler.handleException(
method, retryPolicy, failoverCount, counters, e);
return CallReturn.RETRY;
}
} catch(Throwable t) {
return new CallReturn(t);
}
}
CallReturn invoke() throws Throwable {
return new CallReturn(invokeMethod());
}
Object invokeMethod() throws Throwable {
if (isRpc) {
Client.setCallIdAndRetryCount(callId, counters.retries);
}
return retryInvocationHandler.invokeMethod(method, args);
}
@Override
public String toString() {
return getClass().getSimpleName() + "#" + callId + ": "
+ method.getDeclaringClass().getSimpleName() + "." + method.getName()
+ "(" + (args == null || args.length == 0? "": Arrays.toString(args))
+ ")";
}
}
static class Counters {
/** Counter for retries. */
private int retries;
/** Counter for method invocation has been failed over. */
private int failovers;
boolean isZeros() {
return retries == 0 && failovers == 0;
}
}
private static class ProxyDescriptor<T> {
@ -144,11 +216,13 @@ static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
private final ProxyDescriptor<T> proxyDescriptor;
private volatile boolean hasMadeASuccessfulCall = false;
private volatile boolean hasSuccessfulCall = false;
private final RetryPolicy defaultPolicy;
private final Map<String,RetryPolicy> methodNameToPolicyMap;
private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy retryPolicy) {
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
@ -167,38 +241,35 @@ private RetryPolicy getRetryPolicy(Method method) {
return policy != null? policy: defaultPolicy;
}
private long getFailoverCount() {
return proxyDescriptor.getFailoverCount();
}
private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
Counters counters) {
if (Client.isAsynchronousMode()) {
return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
counters, this);
} else {
return new Call(method, args, isRpc, callId, counters, this);
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
return invoke(method, args, isRpc, callId, new Counters());
}
private Object invoke(final Method method, final Object[] args,
final boolean isRpc, final int callId, final Counters counters)
throws Throwable {
final RetryPolicy policy = getRetryPolicy(method);
final Counters counters = new Counters();
final Call call = newCall(method, args, isRpc, callId, counters);
while (true) {
// The number of times this invocation handler has ever been failed over,
// before this method invocation attempt. Used to prevent concurrent
// failed method invocations from triggering multiple failover attempts.
final long failoverCount = proxyDescriptor.getFailoverCount();
if (isRpc) {
Client.setCallIdAndRetryCount(callId, counters.retries);
}
try {
final Object ret = invokeMethod(method, args);
hasMadeASuccessfulCall = true;
return ret;
} catch (Exception ex) {
if (Thread.currentThread().isInterrupted()) {
// If interrupted, do not retry.
throw ex;
}
handleException(method, policy, failoverCount, counters, ex);
final CallReturn c = call.invokeOnce();
final CallReturn.State state = c.getState();
if (state == CallReturn.State.ASYNC_INVOKED) {
return null; // return null for async calls
} else if (c.getState() != CallReturn.State.RETRY) {
return c.getReturnValue();
}
}
}
@ -239,7 +310,8 @@ private void log(final Method method, final boolean isFailover,
final int failovers, final long delay, final Exception ex) {
// log info if this has made some successful calls or
// this is not the first failover
final boolean info = hasMadeASuccessfulCall || failovers != 0;
final boolean info = hasSuccessfulCall || failovers != 0
|| asyncCallHandler.hasSuccessfulCall();
if (!info && !LOG.isDebugEnabled()) {
return;
}
@ -265,7 +337,9 @@ protected Object invokeMethod(Method method, Object[] args) throws Throwable {
if (!method.isAccessible()) {
method.setAccessible(true);
}
return method.invoke(proxyDescriptor.getProxy(), args);
final Object r = method.invoke(proxyDescriptor.getProxy(), args);
hasSuccessfulCall = true;
return r;
} catch (InvocationTargetException e) {
throw e.getCause();
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.retry;
import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
@ -649,6 +650,7 @@ public RetryAction shouldRetry(Exception e, int retries,
}
if (e instanceof ConnectException ||
e instanceof EOFException ||
e instanceof NoRouteToHostException ||
e instanceof UnknownHostException ||
e instanceof StandbyException ||

View File

@ -58,7 +58,6 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.Tracer;
@ -94,8 +93,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
= new ThreadLocal<>();
private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
ASYNC_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@ -106,8 +105,9 @@ protected Boolean initialValue() {
@SuppressWarnings("unchecked")
@Unstable
public static <T> Future<T> getAsyncRpcResponse() {
return (Future<T>) ASYNC_RPC_RESPONSE.get();
public static <T extends Writable> AsyncGet<T, IOException>
getAsyncRpcResponse() {
return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
}
/** Set call id and retry count for the next call. */
@ -1413,9 +1413,16 @@ public Writable get(long timeout, TimeUnit unit)
}
}
}
@Override
public boolean isDone() {
synchronized (call) {
return call.done;
}
}
};
ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
ASYNC_RPC_RESPONSE.set(asyncGet);
return null;
} else {
return getRpcResponse(call, connection, -1, null);
@ -1460,10 +1467,8 @@ private Writable getRpcResponse(final Call call, final Connection connection,
synchronized (call) {
while (!call.done) {
try {
final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
timeout, unit);
call.wait(waitTimeout); // wait for the result
if (waitTimeout > 0 && !call.done) {
AsyncGet.Util.wait(call, timeout, unit);
if (timeout >= 0 && !call.done) {
return null;
}
} catch (InterruptedException ie) {

View File

@ -54,7 +54,6 @@
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -256,14 +255,18 @@ public Object invoke(Object proxy, final Method method, Object[] args)
}
if (Client.isAsynchronousMode()) {
final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
final AsyncGet<RpcResponseWrapper, IOException> arr
= Client.getAsyncRpcResponse();
final AsyncGet<Message, Exception> asyncGet
= new AsyncGet<Message, Exception>() {
@Override
public Message get(long timeout, TimeUnit unit) throws Exception {
final RpcResponseWrapper rrw = timeout < 0?
frrw.get(): frrw.get(timeout, unit);
return getReturnMessage(method, rrw);
return getReturnMessage(method, arr.get(timeout, unit));
}
@Override
public boolean isDone() {
return arr.isDone();
}
};
ASYNC_RETURN_MESSAGE.set(asyncGet);

View File

@ -47,14 +47,19 @@ public interface AsyncGet<R, E extends Throwable> {
R get(long timeout, TimeUnit unit)
throws E, TimeoutException, InterruptedException;
/** @return true if the underlying computation is done; false, otherwise. */
boolean isDone();
/** Utility */
class Util {
/**
* @return {@link Object#wait(long)} timeout converted
* from {@link #get(long, TimeUnit)} timeout.
*/
public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
/** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */
public static void wait(Object obj, long timeout, TimeUnit unit)
throws InterruptedException {
if (timeout < 0) {
obj.wait();
} else if (timeout > 0) {
obj.wait(unit.toMillis(timeout));
}
}
}
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -50,6 +51,11 @@ public class TestAsyncIPC {
private static Configuration conf;
private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
static <T extends Writable> AsyncGetFuture<T, IOException>
getAsyncRpcResponseFuture() {
return new AsyncGetFuture<>(Client.getAsyncRpcResponse());
}
@Before
public void setupConf() {
conf = new Configuration();
@ -84,7 +90,7 @@ public void run() {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
returnFutures.put(i, Client.getAsyncRpcResponse());
returnFutures.put(i, getAsyncRpcResponseFuture());
expectedValues.put(i, param);
} catch (Exception e) {
failed = true;
@ -204,7 +210,7 @@ private void runCall(final int idx, final long param)
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
returnFutures.put(idx, Client.getAsyncRpcResponse());
returnFutures.put(idx, getAsyncRpcResponseFuture());
expectedValues.put(idx, param);
}

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.io.retry.AsyncCallHandler;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.hadoop.ipc.Client;
@ -51,9 +51,8 @@ public class AsyncDistributedFileSystem {
this.dfs = dfs;
}
static <T> Future<T> getReturnValue() {
return new AsyncGetFuture<>(
ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue());
private static <T> Future<T> getReturnValue() {
return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
}
/**

View File

@ -29,7 +29,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@ -184,6 +183,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.AsyncCallHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -212,8 +212,6 @@
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
private static final ThreadLocal<AsyncGet<?, Exception>>
ASYNC_RETURN_VALUE = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@ -247,12 +245,6 @@ public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
rpcProxy = proxy;
}
@SuppressWarnings("unchecked")
@Unstable
public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
@ -391,8 +383,13 @@ public Void get(long timeout, TimeUnit unit) throws Exception {
asyncReturnMessage.get(timeout, unit);
return null;
}
@Override
public boolean isDone() {
return asyncReturnMessage.isDone();
}
};
ASYNC_RETURN_VALUE.set(asyncGet);
AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
}
@Override
@ -1367,17 +1364,20 @@ public AclStatus getAclStatus(String src) throws IOException {
rpcProxy.getAclStatus(null, req);
final AsyncGet<Message, Exception> asyncReturnMessage
= ProtobufRpcEngine.getAsyncReturnMessage();
final AsyncGet<AclStatus, Exception> asyncGet =
new AsyncGet<AclStatus, Exception>() {
final AsyncGet<AclStatus, Exception> asyncGet
= new AsyncGet<AclStatus, Exception>() {
@Override
public AclStatus get(long timeout, TimeUnit unit)
throws Exception {
return PBHelperClient
.convert((GetAclStatusResponseProto) asyncReturnMessage
.get(timeout, unit));
public AclStatus get(long timeout, TimeUnit unit) throws Exception {
return PBHelperClient.convert((GetAclStatusResponseProto)
asyncReturnMessage.get(timeout, unit));
}
@Override
public boolean isDone() {
return asyncReturnMessage.isDone();
}
};
ASYNC_RETURN_VALUE.set(asyncGet);
AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
return null;
} else {
return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.junit.After;
@ -70,7 +71,7 @@ public class TestAsyncDFS {
public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
private final short replFactor = 1;
private final long blockSize = 512;
private long fileLen = blockSize * 3;
private long fileLen = 0;
private final long seed = Time.now();
private final Random r = new Random(seed);
private final PermissionGenerator permGenerator = new PermissionGenerator(r);
@ -80,7 +81,7 @@ public class TestAsyncDFS {
private Configuration conf;
private MiniDFSCluster cluster;
private FileSystem fs;
private DistributedFileSystem fs;
private AsyncDistributedFileSystem adfs;
@Before
@ -95,10 +96,10 @@ public void setup() throws IOException {
ASYNC_CALL_LIMIT);
// set server handlers
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
fs = FileSystem.get(conf);
adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
fs = cluster.getFileSystem();
adfs = fs.getAsyncDistributedFileSystem();
}
@After
@ -113,31 +114,6 @@ public void tearDown() throws IOException {
}
}
static class AclQueueEntry {
private final Object future;
private final Path path;
private final Boolean isSetAcl;
AclQueueEntry(final Object future, final Path path,
final Boolean isSetAcl) {
this.future = future;
this.path = path;
this.isSetAcl = isSetAcl;
}
public final Object getFuture() {
return future;
}
public final Path getPath() {
return path;
}
public final Boolean isSetAcl() {
return this.isSetAcl;
}
}
@Test(timeout=60000)
public void testBatchAsyncAcl() throws Exception {
final String basePath = "testBatchAsyncAcl";
@ -348,7 +324,7 @@ public AsyncDistributedFileSystem run() throws Exception {
public static void checkPermissionDenied(final Exception e, final Path dir,
final String user) {
assertTrue(e.getCause() instanceof ExecutionException);
assertTrue(e.getCause() instanceof RemoteException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
@ -470,4 +446,9 @@ public void testConcurrentAsyncAPI() throws Exception {
assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
}
}
@Test
public void testAsyncWithoutRetry() throws Exception {
TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs);
}
}

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.io.retry.AsyncCallHandler;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/** Test async methods with HA setup. */
public class TestAsyncHDFSWithHA {
static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class);
static {
GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL);
}
private static <T> Future<T> getReturnValue() {
return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
}
static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs,
Path[] dsts) throws IOException {
for (int i = 0; i < srcs.length; i++) {
srcs[i] = new Path(dir, "src" + i);
dsts[i] = new Path(dir, "dst" + i);
dfs.mkdirs(srcs[i]);
}
}
static void runTestAsyncWithoutRetry(Configuration conf,
MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception {
final int num = 5;
final String renameDir = "/testAsyncWithoutRetry/";
final Path[] srcs = new Path[num + 1];
final Path[] dsts = new Path[num + 1];
mkdirs(dfs, renameDir, srcs, dsts);
// create a proxy without retry.
final NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> proxyInfo
= NameNodeProxies.createNonHAProxy(conf,
cluster.getNameNode(0).getNameNodeAddress(),
ClientProtocol.class, UserGroupInformation.getCurrentUser(),
false);
final ClientProtocol cp = proxyInfo.getProxy();
// submit async calls
Client.setAsynchronousMode(true);
final List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < num; i++) {
final String src = srcs[i].toString();
final String dst = dsts[i].toString();
LOG.info(i + ") rename " + src + " -> " + dst);
cp.rename2(src, dst);
results.add(getReturnValue());
}
Client.setAsynchronousMode(false);
// wait for the async calls
for (Future<Void> f : results) {
f.get();
}
//check results
for (int i = 0; i < num; i++) {
Assert.assertEquals(false, dfs.exists(srcs[i]));
Assert.assertEquals(true, dfs.exists(dsts[i]));
}
}
/** Testing HDFS async methods with HA setup. */
@Test(timeout = 120000)
public void testAsyncWithHAFailover() throws Exception {
final int num = 10;
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0).build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
final DistributedFileSystem dfs = HATestUtil.configureFailoverFs(
cluster, conf);
runTestAsyncWithoutRetry(conf, cluster, dfs);
final String renameDir = "/testAsyncWithHAFailover/";
final Path[] srcs = new Path[num + 1];
final Path[] dsts = new Path[num + 1];
mkdirs(dfs, renameDir, srcs, dsts);
// submit async calls and trigger failover in the middle.
final AsyncDistributedFileSystem adfs
= dfs.getAsyncDistributedFileSystem();
final ExecutorService executor = Executors.newFixedThreadPool(num + 1);
final List<Future<Void>> results = new ArrayList<>();
final List<IOException> exceptions = new ArrayList<>();
final List<Future<?>> futures = new ArrayList<>();
final int half = num/2;
for(int i = 0; i <= num; i++) {
final int id = i;
futures.add(executor.submit(new Runnable() {
@Override
public void run() {
try {
if (id == half) {
// failover
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
} else {
// rename
results.add(adfs.rename(srcs[id], dsts[id]));
}
} catch (IOException e) {
exceptions.add(e);
}
}
}));
}
// wait for the tasks
Assert.assertEquals(num + 1, futures.size());
for(int i = 0; i <= num; i++) {
futures.get(i).get();
}
// wait for the async calls
Assert.assertEquals(num, results.size());
Assert.assertTrue(exceptions.isEmpty());
for(Future<Void> r : results) {
r.get();
}
// check results
for(int i = 0; i <= num; i++) {
final boolean renamed = i != half;
Assert.assertEquals(!renamed, dfs.exists(srcs[i]));
Assert.assertEquals(renamed, dfs.exists(dsts[i]));
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -135,7 +136,8 @@ public CouldNotCatchUpException(String message) {
}
/** Gets the filesystem instance by setting the failover configurations */
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
public static DistributedFileSystem configureFailoverFs(
MiniDFSCluster cluster, Configuration conf)
throws IOException, URISyntaxException {
return configureFailoverFs(cluster, conf, 0);
}
@ -147,13 +149,14 @@ public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configurati
* @param nsIndex namespace index starting with zero
* @throws IOException if an error occurs rolling the edit log
*/
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
public static DistributedFileSystem configureFailoverFs(
MiniDFSCluster cluster, Configuration conf,
int nsIndex) throws IOException, URISyntaxException {
conf = new Configuration(conf);
String logicalName = getLogicalHostname(cluster);
setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
return fs;
return (DistributedFileSystem)fs;
}
public static void setFailoverConfigurations(MiniDFSCluster cluster,