Revert "HADOOP-13226 Support async call retry and failover."
This reverts commit a8941d7790b2209ac779c9372298b833ededd132.
This commit is contained in:
parent
6f69113417
commit
886e239606
@ -363,13 +363,7 @@
|
|||||||
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
|
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
|
||||||
</Match>
|
</Match>
|
||||||
|
|
||||||
<!-- WA_NOT_IN_LOOP is invalid in util.concurrent.AsyncGet$Util.wait. -->
|
<!-- Synchronization performed on util.concurrent instance. -->
|
||||||
<Match>
|
|
||||||
<Class name="org.apache.hadoop.util.concurrent.AsyncGet$Util" />
|
|
||||||
<Method name="wait" />
|
|
||||||
<Bug pattern="WA_NOT_IN_LOOP" />
|
|
||||||
</Match>
|
|
||||||
|
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.service.AbstractService" />
|
<Class name="org.apache.hadoop.service.AbstractService" />
|
||||||
<Method name="stop" />
|
<Method name="stop" />
|
||||||
|
@ -1,321 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.io.retry;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.ipc.Client;
|
|
||||||
import org.apache.hadoop.util.Daemon;
|
|
||||||
import org.apache.hadoop.util.Time;
|
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGet;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
/** Handle async calls. */
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class AsyncCallHandler {
|
|
||||||
static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
|
|
||||||
|
|
||||||
private static final ThreadLocal<AsyncGet<?, Exception>>
|
|
||||||
LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
|
|
||||||
private static final ThreadLocal<AsyncGet<Object, Throwable>>
|
|
||||||
ASYNC_RETURN = new ThreadLocal<>();
|
|
||||||
|
|
||||||
/** @return the async return value from {@link AsyncCallHandler}. */
|
|
||||||
@InterfaceStability.Unstable
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public static <R, T extends Throwable> AsyncGet<R, T> getAsyncReturn() {
|
|
||||||
final AsyncGet<R, T> asyncGet = (AsyncGet<R, T>)ASYNC_RETURN.get();
|
|
||||||
if (asyncGet != null) {
|
|
||||||
ASYNC_RETURN.set(null);
|
|
||||||
return asyncGet;
|
|
||||||
} else {
|
|
||||||
return (AsyncGet<R, T>) getLowerLayerAsyncReturn();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** For the lower rpc layers to set the async return value. */
|
|
||||||
@InterfaceStability.Unstable
|
|
||||||
public static void setLowerLayerAsyncReturn(
|
|
||||||
AsyncGet<?, Exception> asyncReturn) {
|
|
||||||
LOWER_LAYER_ASYNC_RETURN.set(asyncReturn);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
|
|
||||||
final AsyncGet<?, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
|
|
||||||
Preconditions.checkNotNull(asyncGet);
|
|
||||||
LOWER_LAYER_ASYNC_RETURN.set(null);
|
|
||||||
return asyncGet;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** A simple concurrent queue which keeping track the empty start time. */
|
|
||||||
static class ConcurrentQueue<T> {
|
|
||||||
private final Queue<T> queue = new LinkedList<>();
|
|
||||||
private long emptyStartTime = Time.monotonicNow();
|
|
||||||
|
|
||||||
synchronized int size() {
|
|
||||||
return queue.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Is the queue empty for more than the given time in millisecond? */
|
|
||||||
synchronized boolean isEmpty(long time) {
|
|
||||||
return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized void offer(T c) {
|
|
||||||
final boolean added = queue.offer(c);
|
|
||||||
Preconditions.checkState(added);
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized T poll() {
|
|
||||||
Preconditions.checkState(!queue.isEmpty());
|
|
||||||
final T t = queue.poll();
|
|
||||||
if (queue.isEmpty()) {
|
|
||||||
emptyStartTime = Time.monotonicNow();
|
|
||||||
}
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** A queue for handling async calls. */
|
|
||||||
static class AsyncCallQueue {
|
|
||||||
private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
|
|
||||||
private final Processor processor = new Processor();
|
|
||||||
|
|
||||||
void addCall(AsyncCall call) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("add " + call);
|
|
||||||
}
|
|
||||||
queue.offer(call);
|
|
||||||
processor.tryStart();
|
|
||||||
}
|
|
||||||
|
|
||||||
void checkCalls() {
|
|
||||||
final int size = queue.size();
|
|
||||||
for (int i = 0; i < size; i++) {
|
|
||||||
final AsyncCall c = queue.poll();
|
|
||||||
if (!c.isDone()) {
|
|
||||||
queue.offer(c); // the call is not done yet, add it back.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Process the async calls in the queue. */
|
|
||||||
private class Processor {
|
|
||||||
static final long GRACE_PERIOD = 10*1000L;
|
|
||||||
static final long SLEEP_PERIOD = 100L;
|
|
||||||
|
|
||||||
private final AtomicReference<Thread> running = new AtomicReference<>();
|
|
||||||
|
|
||||||
boolean isRunning(Daemon d) {
|
|
||||||
return d == running.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
void tryStart() {
|
|
||||||
final Thread current = Thread.currentThread();
|
|
||||||
if (running.compareAndSet(null, current)) {
|
|
||||||
final Daemon daemon = new Daemon() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
for (; isRunning(this);) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(SLEEP_PERIOD);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
kill(this);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
checkCalls();
|
|
||||||
tryStop(this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
final boolean set = running.compareAndSet(current, daemon);
|
|
||||||
Preconditions.checkState(set);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
|
|
||||||
}
|
|
||||||
daemon.start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void tryStop(Daemon d) {
|
|
||||||
if (queue.isEmpty(GRACE_PERIOD)) {
|
|
||||||
kill(d);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void kill(Daemon d) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Killing " + d);
|
|
||||||
}
|
|
||||||
final boolean set = running.compareAndSet(d, null);
|
|
||||||
Preconditions.checkState(set);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class AsyncValue<V> {
|
|
||||||
private V value;
|
|
||||||
|
|
||||||
synchronized V waitAsyncValue(long timeout, TimeUnit unit)
|
|
||||||
throws InterruptedException, TimeoutException {
|
|
||||||
if (value != null) {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
AsyncGet.Util.wait(this, timeout, unit);
|
|
||||||
if (value != null) {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new TimeoutException("waitCallReturn timed out "
|
|
||||||
+ timeout + " " + unit);
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized void set(V v) {
|
|
||||||
Preconditions.checkNotNull(v);
|
|
||||||
Preconditions.checkState(value == null);
|
|
||||||
value = v;
|
|
||||||
notify();
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized boolean isDone() {
|
|
||||||
return value != null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class AsyncCall extends RetryInvocationHandler.Call {
|
|
||||||
private final AsyncCallHandler asyncCallHandler;
|
|
||||||
|
|
||||||
private final AsyncValue<CallReturn> asyncCallReturn = new AsyncValue<>();
|
|
||||||
private AsyncGet<?, Exception> lowerLayerAsyncGet;
|
|
||||||
|
|
||||||
AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
|
|
||||||
RetryInvocationHandler.Counters counters,
|
|
||||||
RetryInvocationHandler<?> retryInvocationHandler,
|
|
||||||
AsyncCallHandler asyncCallHandler) {
|
|
||||||
super(method, args, isRpc, callId, counters, retryInvocationHandler);
|
|
||||||
|
|
||||||
this.asyncCallHandler = asyncCallHandler;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @return true if the call is done; otherwise, return false. */
|
|
||||||
boolean isDone() {
|
|
||||||
final CallReturn r = invokeOnce();
|
|
||||||
switch (r.getState()) {
|
|
||||||
case RETURNED:
|
|
||||||
case EXCEPTION:
|
|
||||||
asyncCallReturn.set(r); // the async call is done
|
|
||||||
return true;
|
|
||||||
case RETRY:
|
|
||||||
invokeOnce();
|
|
||||||
break;
|
|
||||||
case ASYNC_CALL_IN_PROGRESS:
|
|
||||||
case ASYNC_INVOKED:
|
|
||||||
// nothing to do
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
Preconditions.checkState(false);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
CallReturn invoke() throws Throwable {
|
|
||||||
LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
|
|
||||||
if (lowerLayerAsyncGet != null) {
|
|
||||||
// async call was submitted early, check the lower level async call
|
|
||||||
final boolean isDone = lowerLayerAsyncGet.isDone();
|
|
||||||
LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
|
|
||||||
if (!isDone) {
|
|
||||||
return CallReturn.ASYNC_CALL_IN_PROGRESS;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
|
|
||||||
} finally {
|
|
||||||
lowerLayerAsyncGet = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// submit a new async call
|
|
||||||
LOG.trace("invoke: ASYNC_INVOKED");
|
|
||||||
final boolean mode = Client.isAsynchronousMode();
|
|
||||||
try {
|
|
||||||
Client.setAsynchronousMode(true);
|
|
||||||
final Object r = invokeMethod();
|
|
||||||
// invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
|
|
||||||
Preconditions.checkState(r == null);
|
|
||||||
lowerLayerAsyncGet = getLowerLayerAsyncReturn();
|
|
||||||
|
|
||||||
if (counters.isZeros()) {
|
|
||||||
// first async attempt, initialize
|
|
||||||
LOG.trace("invoke: initAsyncCall");
|
|
||||||
asyncCallHandler.initAsyncCall(this, asyncCallReturn);
|
|
||||||
}
|
|
||||||
return CallReturn.ASYNC_INVOKED;
|
|
||||||
} finally {
|
|
||||||
Client.setAsynchronousMode(mode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
|
|
||||||
private volatile boolean hasSuccessfulCall = false;
|
|
||||||
|
|
||||||
AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
|
|
||||||
int callId, RetryInvocationHandler.Counters counters,
|
|
||||||
RetryInvocationHandler<?> retryInvocationHandler) {
|
|
||||||
return new AsyncCall(method, args, isRpc, callId, counters,
|
|
||||||
retryInvocationHandler, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean hasSuccessfulCall() {
|
|
||||||
return hasSuccessfulCall;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initAsyncCall(final AsyncCall asyncCall,
|
|
||||||
final AsyncValue<CallReturn> asyncCallReturn) {
|
|
||||||
asyncCalls.addCall(asyncCall);
|
|
||||||
|
|
||||||
final AsyncGet<Object, Throwable> asyncGet
|
|
||||||
= new AsyncGet<Object, Throwable>() {
|
|
||||||
@Override
|
|
||||||
public Object get(long timeout, TimeUnit unit) throws Throwable {
|
|
||||||
final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit);
|
|
||||||
final Object r = c.getReturnValue();
|
|
||||||
hasSuccessfulCall = true;
|
|
||||||
return r;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDone() {
|
|
||||||
return asyncCallReturn.isDone();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
ASYNC_RETURN.set(asyncGet);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,75 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.io.retry;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/** The call return from a method invocation. */
|
|
||||||
class CallReturn {
|
|
||||||
/** The return state. */
|
|
||||||
enum State {
|
|
||||||
/** Call is returned successfully. */
|
|
||||||
RETURNED,
|
|
||||||
/** Call throws an exception. */
|
|
||||||
EXCEPTION,
|
|
||||||
/** Call should be retried according to the {@link RetryPolicy}. */
|
|
||||||
RETRY,
|
|
||||||
/** Call, which is async, is still in progress. */
|
|
||||||
ASYNC_CALL_IN_PROGRESS,
|
|
||||||
/** Call, which is async, just has been invoked. */
|
|
||||||
ASYNC_INVOKED
|
|
||||||
}
|
|
||||||
|
|
||||||
static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn(
|
|
||||||
State.ASYNC_CALL_IN_PROGRESS);
|
|
||||||
static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
|
|
||||||
static final CallReturn RETRY = new CallReturn(State.RETRY);
|
|
||||||
|
|
||||||
private final Object returnValue;
|
|
||||||
private final Throwable thrown;
|
|
||||||
private final State state;
|
|
||||||
|
|
||||||
CallReturn(Object r) {
|
|
||||||
this(r, null, State.RETURNED);
|
|
||||||
}
|
|
||||||
CallReturn(Throwable t) {
|
|
||||||
this(null, t, State.EXCEPTION);
|
|
||||||
Preconditions.checkNotNull(t);
|
|
||||||
}
|
|
||||||
private CallReturn(State s) {
|
|
||||||
this(null, null, s);
|
|
||||||
}
|
|
||||||
private CallReturn(Object r, Throwable t, State s) {
|
|
||||||
Preconditions.checkArgument(r == null || t == null);
|
|
||||||
returnValue = r;
|
|
||||||
thrown = t;
|
|
||||||
state = s;
|
|
||||||
}
|
|
||||||
|
|
||||||
State getState() {
|
|
||||||
return state;
|
|
||||||
}
|
|
||||||
|
|
||||||
Object getReturnValue() throws Throwable {
|
|
||||||
if (state == State.EXCEPTION) {
|
|
||||||
throw thrown;
|
|
||||||
}
|
|
||||||
Preconditions.checkState(state == State.RETURNED, "state == %s", state);
|
|
||||||
return returnValue;
|
|
||||||
}
|
|
||||||
}
|
|
@ -42,83 +42,11 @@
|
|||||||
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||||
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
||||||
|
|
||||||
static class Call {
|
private static class Counters {
|
||||||
private final Method method;
|
|
||||||
private final Object[] args;
|
|
||||||
private final boolean isRpc;
|
|
||||||
private final int callId;
|
|
||||||
final Counters counters;
|
|
||||||
|
|
||||||
private final RetryPolicy retryPolicy;
|
|
||||||
private final RetryInvocationHandler<?> retryInvocationHandler;
|
|
||||||
|
|
||||||
Call(Method method, Object[] args, boolean isRpc, int callId,
|
|
||||||
Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
|
|
||||||
this.method = method;
|
|
||||||
this.args = args;
|
|
||||||
this.isRpc = isRpc;
|
|
||||||
this.callId = callId;
|
|
||||||
this.counters = counters;
|
|
||||||
|
|
||||||
this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
|
|
||||||
this.retryInvocationHandler = retryInvocationHandler;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Invoke the call once without retrying. */
|
|
||||||
synchronized CallReturn invokeOnce() {
|
|
||||||
try {
|
|
||||||
// The number of times this invocation handler has ever been failed over
|
|
||||||
// before this method invocation attempt. Used to prevent concurrent
|
|
||||||
// failed method invocations from triggering multiple failover attempts.
|
|
||||||
final long failoverCount = retryInvocationHandler.getFailoverCount();
|
|
||||||
try {
|
|
||||||
return invoke();
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace(this, e);
|
|
||||||
}
|
|
||||||
if (Thread.currentThread().isInterrupted()) {
|
|
||||||
// If interrupted, do not retry.
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
retryInvocationHandler.handleException(
|
|
||||||
method, retryPolicy, failoverCount, counters, e);
|
|
||||||
return CallReturn.RETRY;
|
|
||||||
}
|
|
||||||
} catch(Throwable t) {
|
|
||||||
return new CallReturn(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
CallReturn invoke() throws Throwable {
|
|
||||||
return new CallReturn(invokeMethod());
|
|
||||||
}
|
|
||||||
|
|
||||||
Object invokeMethod() throws Throwable {
|
|
||||||
if (isRpc) {
|
|
||||||
Client.setCallIdAndRetryCount(callId, counters.retries);
|
|
||||||
}
|
|
||||||
return retryInvocationHandler.invokeMethod(method, args);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return getClass().getSimpleName() + "#" + callId + ": "
|
|
||||||
+ method.getDeclaringClass().getSimpleName() + "." + method.getName()
|
|
||||||
+ "(" + (args == null || args.length == 0? "": Arrays.toString(args))
|
|
||||||
+ ")";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class Counters {
|
|
||||||
/** Counter for retries. */
|
/** Counter for retries. */
|
||||||
private int retries;
|
private int retries;
|
||||||
/** Counter for method invocation has been failed over. */
|
/** Counter for method invocation has been failed over. */
|
||||||
private int failovers;
|
private int failovers;
|
||||||
|
|
||||||
boolean isZeros() {
|
|
||||||
return retries == 0 && failovers == 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ProxyDescriptor<T> {
|
private static class ProxyDescriptor<T> {
|
||||||
@ -216,13 +144,11 @@ static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
|
|||||||
|
|
||||||
private final ProxyDescriptor<T> proxyDescriptor;
|
private final ProxyDescriptor<T> proxyDescriptor;
|
||||||
|
|
||||||
private volatile boolean hasSuccessfulCall = false;
|
private volatile boolean hasMadeASuccessfulCall = false;
|
||||||
|
|
||||||
private final RetryPolicy defaultPolicy;
|
private final RetryPolicy defaultPolicy;
|
||||||
private final Map<String,RetryPolicy> methodNameToPolicyMap;
|
private final Map<String,RetryPolicy> methodNameToPolicyMap;
|
||||||
|
|
||||||
private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
|
|
||||||
|
|
||||||
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
|
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
|
||||||
RetryPolicy retryPolicy) {
|
RetryPolicy retryPolicy) {
|
||||||
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
|
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
|
||||||
@ -241,35 +167,38 @@ private RetryPolicy getRetryPolicy(Method method) {
|
|||||||
return policy != null? policy: defaultPolicy;
|
return policy != null? policy: defaultPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getFailoverCount() {
|
|
||||||
return proxyDescriptor.getFailoverCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
|
|
||||||
Counters counters) {
|
|
||||||
if (Client.isAsynchronousMode()) {
|
|
||||||
return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
|
|
||||||
counters, this);
|
|
||||||
} else {
|
|
||||||
return new Call(method, args, isRpc, callId, counters, this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object invoke(Object proxy, Method method, Object[] args)
|
public Object invoke(Object proxy, Method method, Object[] args)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
|
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
|
||||||
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
|
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
|
||||||
final Counters counters = new Counters();
|
return invoke(method, args, isRpc, callId, new Counters());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object invoke(final Method method, final Object[] args,
|
||||||
|
final boolean isRpc, final int callId, final Counters counters)
|
||||||
|
throws Throwable {
|
||||||
|
final RetryPolicy policy = getRetryPolicy(method);
|
||||||
|
|
||||||
final Call call = newCall(method, args, isRpc, callId, counters);
|
|
||||||
while (true) {
|
while (true) {
|
||||||
final CallReturn c = call.invokeOnce();
|
// The number of times this invocation handler has ever been failed over,
|
||||||
final CallReturn.State state = c.getState();
|
// before this method invocation attempt. Used to prevent concurrent
|
||||||
if (state == CallReturn.State.ASYNC_INVOKED) {
|
// failed method invocations from triggering multiple failover attempts.
|
||||||
return null; // return null for async calls
|
final long failoverCount = proxyDescriptor.getFailoverCount();
|
||||||
} else if (c.getState() != CallReturn.State.RETRY) {
|
|
||||||
return c.getReturnValue();
|
if (isRpc) {
|
||||||
|
Client.setCallIdAndRetryCount(callId, counters.retries);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
final Object ret = invokeMethod(method, args);
|
||||||
|
hasMadeASuccessfulCall = true;
|
||||||
|
return ret;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
// If interrupted, do not retry.
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
handleException(method, policy, failoverCount, counters, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -310,8 +239,7 @@ private void log(final Method method, final boolean isFailover,
|
|||||||
final int failovers, final long delay, final Exception ex) {
|
final int failovers, final long delay, final Exception ex) {
|
||||||
// log info if this has made some successful calls or
|
// log info if this has made some successful calls or
|
||||||
// this is not the first failover
|
// this is not the first failover
|
||||||
final boolean info = hasSuccessfulCall || failovers != 0
|
final boolean info = hasMadeASuccessfulCall || failovers != 0;
|
||||||
|| asyncCallHandler.hasSuccessfulCall();
|
|
||||||
if (!info && !LOG.isDebugEnabled()) {
|
if (!info && !LOG.isDebugEnabled()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -337,9 +265,7 @@ protected Object invokeMethod(Method method, Object[] args) throws Throwable {
|
|||||||
if (!method.isAccessible()) {
|
if (!method.isAccessible()) {
|
||||||
method.setAccessible(true);
|
method.setAccessible(true);
|
||||||
}
|
}
|
||||||
final Object r = method.invoke(proxyDescriptor.getProxy(), args);
|
return method.invoke(proxyDescriptor.getProxy(), args);
|
||||||
hasSuccessfulCall = true;
|
|
||||||
return r;
|
|
||||||
} catch (InvocationTargetException e) {
|
} catch (InvocationTargetException e) {
|
||||||
throw e.getCause();
|
throw e.getCause();
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.io.retry;
|
package org.apache.hadoop.io.retry;
|
||||||
|
|
||||||
import java.io.EOFException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.NoRouteToHostException;
|
import java.net.NoRouteToHostException;
|
||||||
@ -650,7 +649,6 @@ public RetryAction shouldRetry(Exception e, int retries,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (e instanceof ConnectException ||
|
if (e instanceof ConnectException ||
|
||||||
e instanceof EOFException ||
|
|
||||||
e instanceof NoRouteToHostException ||
|
e instanceof NoRouteToHostException ||
|
||||||
e instanceof UnknownHostException ||
|
e instanceof UnknownHostException ||
|
||||||
e instanceof StandbyException ||
|
e instanceof StandbyException ||
|
||||||
|
@ -58,6 +58,7 @@
|
|||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGet;
|
import org.apache.hadoop.util.concurrent.AsyncGet;
|
||||||
|
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
|
||||||
import org.apache.htrace.core.Span;
|
import org.apache.htrace.core.Span;
|
||||||
import org.apache.htrace.core.Tracer;
|
import org.apache.htrace.core.Tracer;
|
||||||
|
|
||||||
@ -93,8 +94,8 @@ public class Client implements AutoCloseable {
|
|||||||
|
|
||||||
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
|
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
|
||||||
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
|
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
|
||||||
private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
|
private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
|
||||||
ASYNC_RPC_RESPONSE = new ThreadLocal<>();
|
= new ThreadLocal<>();
|
||||||
private static final ThreadLocal<Boolean> asynchronousMode =
|
private static final ThreadLocal<Boolean> asynchronousMode =
|
||||||
new ThreadLocal<Boolean>() {
|
new ThreadLocal<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
@ -105,9 +106,8 @@ protected Boolean initialValue() {
|
|||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Unstable
|
@Unstable
|
||||||
public static <T extends Writable> AsyncGet<T, IOException>
|
public static <T> Future<T> getAsyncRpcResponse() {
|
||||||
getAsyncRpcResponse() {
|
return (Future<T>) ASYNC_RPC_RESPONSE.get();
|
||||||
return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Set call id and retry count for the next call. */
|
/** Set call id and retry count for the next call. */
|
||||||
@ -1414,16 +1414,9 @@ public Writable get(long timeout, TimeUnit unit)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDone() {
|
|
||||||
synchronized (call) {
|
|
||||||
return call.done;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
ASYNC_RPC_RESPONSE.set(asyncGet);
|
ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
|
||||||
return null;
|
return null;
|
||||||
} else {
|
} else {
|
||||||
return getRpcResponse(call, connection, -1, null);
|
return getRpcResponse(call, connection, -1, null);
|
||||||
@ -1468,8 +1461,10 @@ private Writable getRpcResponse(final Call call, final Connection connection,
|
|||||||
synchronized (call) {
|
synchronized (call) {
|
||||||
while (!call.done) {
|
while (!call.done) {
|
||||||
try {
|
try {
|
||||||
AsyncGet.Util.wait(call, timeout, unit);
|
final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
|
||||||
if (timeout >= 0 && !call.done) {
|
timeout, unit);
|
||||||
|
call.wait(waitTimeout); // wait for the result
|
||||||
|
if (waitTimeout > 0 && !call.done) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@ -255,18 +256,14 @@ public Object invoke(Object proxy, final Method method, Object[] args)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (Client.isAsynchronousMode()) {
|
if (Client.isAsynchronousMode()) {
|
||||||
final AsyncGet<RpcResponseWrapper, IOException> arr
|
final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
|
||||||
= Client.getAsyncRpcResponse();
|
|
||||||
final AsyncGet<Message, Exception> asyncGet
|
final AsyncGet<Message, Exception> asyncGet
|
||||||
= new AsyncGet<Message, Exception>() {
|
= new AsyncGet<Message, Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public Message get(long timeout, TimeUnit unit) throws Exception {
|
public Message get(long timeout, TimeUnit unit) throws Exception {
|
||||||
return getReturnMessage(method, arr.get(timeout, unit));
|
final RpcResponseWrapper rrw = timeout < 0?
|
||||||
}
|
frrw.get(): frrw.get(timeout, unit);
|
||||||
|
return getReturnMessage(method, rrw);
|
||||||
@Override
|
|
||||||
public boolean isDone() {
|
|
||||||
return arr.isDone();
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
ASYNC_RETURN_MESSAGE.set(asyncGet);
|
ASYNC_RETURN_MESSAGE.set(asyncGet);
|
||||||
|
@ -47,19 +47,14 @@ public interface AsyncGet<R, E extends Throwable> {
|
|||||||
R get(long timeout, TimeUnit unit)
|
R get(long timeout, TimeUnit unit)
|
||||||
throws E, TimeoutException, InterruptedException;
|
throws E, TimeoutException, InterruptedException;
|
||||||
|
|
||||||
/** @return true if the underlying computation is done; false, otherwise. */
|
|
||||||
boolean isDone();
|
|
||||||
|
|
||||||
/** Utility */
|
/** Utility */
|
||||||
class Util {
|
class Util {
|
||||||
/** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */
|
/**
|
||||||
public static void wait(Object obj, long timeout, TimeUnit unit)
|
* @return {@link Object#wait(long)} timeout converted
|
||||||
throws InterruptedException {
|
* from {@link #get(long, TimeUnit)} timeout.
|
||||||
if (timeout < 0) {
|
*/
|
||||||
obj.wait();
|
public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
|
||||||
} else if (timeout > 0) {
|
return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
|
||||||
obj.wait(unit.toMillis(timeout));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,6 @@
|
|||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -51,12 +50,6 @@ public class TestAsyncIPC {
|
|||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
|
private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
|
||||||
|
|
||||||
static <T extends Writable> AsyncGetFuture<T, IOException>
|
|
||||||
getAsyncRpcResponseFuture() {
|
|
||||||
return (AsyncGetFuture<T, IOException>) new AsyncGetFuture<>(
|
|
||||||
Client.getAsyncRpcResponse());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setupConf() {
|
public void setupConf() {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
@ -91,7 +84,7 @@ public void run() {
|
|||||||
try {
|
try {
|
||||||
final long param = TestIPC.RANDOM.nextLong();
|
final long param = TestIPC.RANDOM.nextLong();
|
||||||
TestIPC.call(client, param, server, conf);
|
TestIPC.call(client, param, server, conf);
|
||||||
Future<LongWritable> returnFuture = getAsyncRpcResponseFuture();
|
Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
|
||||||
returnFutures.put(i, returnFuture);
|
returnFutures.put(i, returnFuture);
|
||||||
expectedValues.put(i, param);
|
expectedValues.put(i, param);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -212,7 +205,7 @@ private void runCall(final int idx, final long param)
|
|||||||
|
|
||||||
private void doCall(final int idx, final long param) throws IOException {
|
private void doCall(final int idx, final long param) throws IOException {
|
||||||
TestIPC.call(client, param, server, conf);
|
TestIPC.call(client, param, server, conf);
|
||||||
Future<LongWritable> returnFuture = getAsyncRpcResponseFuture();
|
Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
|
||||||
returnFutures.put(idx, returnFuture);
|
returnFutures.put(idx, returnFuture);
|
||||||
expectedValues.put(idx, param);
|
expectedValues.put(idx, param);
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,8 @@
|
|||||||
import org.apache.hadoop.fs.permission.AclStatus;
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||||
import org.apache.hadoop.io.retry.AsyncCallHandler;
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
|
||||||
|
import org.apache.hadoop.util.concurrent.AsyncGet;
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
|
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
|
||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
|
|
||||||
@ -51,8 +52,10 @@ public class AsyncDistributedFileSystem {
|
|||||||
this.dfs = dfs;
|
this.dfs = dfs;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> Future<T> getReturnValue() {
|
static <T> Future<T> getReturnValue() {
|
||||||
return (Future<T>)new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
|
final AsyncGet<T, Exception> asyncGet
|
||||||
|
= ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue();
|
||||||
|
return new AsyncGetFuture<>(asyncGet);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
||||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
||||||
@ -175,7 +176,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.retry.AsyncCallHandler;
|
|
||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
@ -204,6 +204,8 @@
|
|||||||
public class ClientNamenodeProtocolTranslatorPB implements
|
public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
|
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
|
||||||
final private ClientNamenodeProtocolPB rpcProxy;
|
final private ClientNamenodeProtocolPB rpcProxy;
|
||||||
|
private static final ThreadLocal<AsyncGet<?, Exception>>
|
||||||
|
ASYNC_RETURN_VALUE = new ThreadLocal<>();
|
||||||
|
|
||||||
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
|
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
|
||||||
GetServerDefaultsRequestProto.newBuilder().build();
|
GetServerDefaultsRequestProto.newBuilder().build();
|
||||||
@ -236,6 +238,12 @@ public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
|
|||||||
rpcProxy = proxy;
|
rpcProxy = proxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Unstable
|
||||||
|
public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
|
||||||
|
return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
RPC.stopProxy(rpcProxy);
|
RPC.stopProxy(rpcProxy);
|
||||||
@ -374,13 +382,8 @@ public Void get(long timeout, TimeUnit unit) throws Exception {
|
|||||||
asyncReturnMessage.get(timeout, unit);
|
asyncReturnMessage.get(timeout, unit);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDone() {
|
|
||||||
return asyncReturnMessage.isDone();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
|
ASYNC_RETURN_VALUE.set(asyncGet);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1352,20 +1355,17 @@ public AclStatus getAclStatus(String src) throws IOException {
|
|||||||
rpcProxy.getAclStatus(null, req);
|
rpcProxy.getAclStatus(null, req);
|
||||||
final AsyncGet<Message, Exception> asyncReturnMessage
|
final AsyncGet<Message, Exception> asyncReturnMessage
|
||||||
= ProtobufRpcEngine.getAsyncReturnMessage();
|
= ProtobufRpcEngine.getAsyncReturnMessage();
|
||||||
final AsyncGet<AclStatus, Exception> asyncGet
|
final AsyncGet<AclStatus, Exception> asyncGet =
|
||||||
= new AsyncGet<AclStatus, Exception>() {
|
new AsyncGet<AclStatus, Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public AclStatus get(long timeout, TimeUnit unit) throws Exception {
|
public AclStatus get(long timeout, TimeUnit unit)
|
||||||
return PBHelperClient.convert((GetAclStatusResponseProto)
|
throws Exception {
|
||||||
asyncReturnMessage.get(timeout, unit));
|
return PBHelperClient
|
||||||
}
|
.convert((GetAclStatusResponseProto) asyncReturnMessage
|
||||||
|
.get(timeout, unit));
|
||||||
@Override
|
|
||||||
public boolean isDone() {
|
|
||||||
return asyncReturnMessage.isDone();
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
|
ASYNC_RETURN_VALUE.set(asyncGet);
|
||||||
return null;
|
return null;
|
||||||
} else {
|
} else {
|
||||||
return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
|
return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
|
||||||
|
@ -55,7 +55,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
|
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
|
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
|
||||||
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
|
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -71,7 +70,7 @@ public class TestAsyncDFS {
|
|||||||
public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
|
public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
|
||||||
private final short replFactor = 1;
|
private final short replFactor = 1;
|
||||||
private final long blockSize = 512;
|
private final long blockSize = 512;
|
||||||
private long fileLen = 0;
|
private long fileLen = blockSize * 3;
|
||||||
private final long seed = Time.now();
|
private final long seed = Time.now();
|
||||||
private final Random r = new Random(seed);
|
private final Random r = new Random(seed);
|
||||||
private final PermissionGenerator permGenerator = new PermissionGenerator(r);
|
private final PermissionGenerator permGenerator = new PermissionGenerator(r);
|
||||||
@ -81,7 +80,7 @@ public class TestAsyncDFS {
|
|||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private DistributedFileSystem fs;
|
private FileSystem fs;
|
||||||
private AsyncDistributedFileSystem adfs;
|
private AsyncDistributedFileSystem adfs;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
@ -96,10 +95,10 @@ public void setup() throws IOException {
|
|||||||
ASYNC_CALL_LIMIT);
|
ASYNC_CALL_LIMIT);
|
||||||
// set server handlers
|
// set server handlers
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
fs = cluster.getFileSystem();
|
fs = FileSystem.get(conf);
|
||||||
adfs = fs.getAsyncDistributedFileSystem();
|
adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
@ -114,6 +113,31 @@ public void tearDown() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class AclQueueEntry {
|
||||||
|
private final Object future;
|
||||||
|
private final Path path;
|
||||||
|
private final Boolean isSetAcl;
|
||||||
|
|
||||||
|
AclQueueEntry(final Object future, final Path path,
|
||||||
|
final Boolean isSetAcl) {
|
||||||
|
this.future = future;
|
||||||
|
this.path = path;
|
||||||
|
this.isSetAcl = isSetAcl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final Object getFuture() {
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final Path getPath() {
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final Boolean isSetAcl() {
|
||||||
|
return this.isSetAcl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testBatchAsyncAcl() throws Exception {
|
public void testBatchAsyncAcl() throws Exception {
|
||||||
final String basePath = "testBatchAsyncAcl";
|
final String basePath = "testBatchAsyncAcl";
|
||||||
@ -324,7 +348,7 @@ public AsyncDistributedFileSystem run() throws Exception {
|
|||||||
|
|
||||||
public static void checkPermissionDenied(final Exception e, final Path dir,
|
public static void checkPermissionDenied(final Exception e, final Path dir,
|
||||||
final String user) {
|
final String user) {
|
||||||
assertTrue(e.getCause() instanceof RemoteException);
|
assertTrue(e.getCause() instanceof ExecutionException);
|
||||||
assertTrue("Permission denied messages must carry AccessControlException",
|
assertTrue("Permission denied messages must carry AccessControlException",
|
||||||
e.getMessage().contains("AccessControlException"));
|
e.getMessage().contains("AccessControlException"));
|
||||||
assertTrue("Permission denied messages must carry the username", e
|
assertTrue("Permission denied messages must carry the username", e
|
||||||
@ -446,9 +470,4 @@ public void testConcurrentAsyncAPI() throws Exception {
|
|||||||
assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
|
assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAsyncWithoutRetry() throws Exception {
|
|
||||||
TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,182 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
|
||||||
import org.apache.hadoop.io.retry.AsyncCallHandler;
|
|
||||||
import org.apache.hadoop.io.retry.RetryInvocationHandler;
|
|
||||||
import org.apache.hadoop.ipc.Client;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
|
|
||||||
import org.apache.log4j.Level;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
/** Test async methods with HA setup. */
|
|
||||||
public class TestAsyncHDFSWithHA {
|
|
||||||
static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class);
|
|
||||||
static {
|
|
||||||
GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static <T> Future<T> getReturnValue() {
|
|
||||||
return (Future<T>)new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
|
|
||||||
}
|
|
||||||
|
|
||||||
static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs,
|
|
||||||
Path[] dsts) throws IOException {
|
|
||||||
for (int i = 0; i < srcs.length; i++) {
|
|
||||||
srcs[i] = new Path(dir, "src" + i);
|
|
||||||
dsts[i] = new Path(dir, "dst" + i);
|
|
||||||
dfs.mkdirs(srcs[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void runTestAsyncWithoutRetry(Configuration conf,
|
|
||||||
MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception {
|
|
||||||
final int num = 5;
|
|
||||||
|
|
||||||
final String renameDir = "/testAsyncWithoutRetry/";
|
|
||||||
final Path[] srcs = new Path[num + 1];
|
|
||||||
final Path[] dsts = new Path[num + 1];
|
|
||||||
mkdirs(dfs, renameDir, srcs, dsts);
|
|
||||||
|
|
||||||
// create a proxy without retry.
|
|
||||||
final NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> proxyInfo
|
|
||||||
= NameNodeProxies.createNonHAProxy(conf,
|
|
||||||
cluster.getNameNode(0).getNameNodeAddress(),
|
|
||||||
ClientProtocol.class, UserGroupInformation.getCurrentUser(),
|
|
||||||
false);
|
|
||||||
final ClientProtocol cp = proxyInfo.getProxy();
|
|
||||||
|
|
||||||
// submit async calls
|
|
||||||
Client.setAsynchronousMode(true);
|
|
||||||
final List<Future<Void>> results = new ArrayList<>();
|
|
||||||
for (int i = 0; i < num; i++) {
|
|
||||||
final String src = srcs[i].toString();
|
|
||||||
final String dst = dsts[i].toString();
|
|
||||||
LOG.info(i + ") rename " + src + " -> " + dst);
|
|
||||||
cp.rename2(src, dst);
|
|
||||||
final Future<Void> returnValue = getReturnValue();
|
|
||||||
results.add(returnValue);
|
|
||||||
}
|
|
||||||
Client.setAsynchronousMode(false);
|
|
||||||
|
|
||||||
// wait for the async calls
|
|
||||||
for (Future<Void> f : results) {
|
|
||||||
f.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
//check results
|
|
||||||
for (int i = 0; i < num; i++) {
|
|
||||||
Assert.assertEquals(false, dfs.exists(srcs[i]));
|
|
||||||
Assert.assertEquals(true, dfs.exists(dsts[i]));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Testing HDFS async methods with HA setup. */
|
|
||||||
@Test(timeout = 120000)
|
|
||||||
public void testAsyncWithHAFailover() throws Exception {
|
|
||||||
final int num = 10;
|
|
||||||
|
|
||||||
final Configuration conf = new HdfsConfiguration();
|
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
||||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
|
||||||
.numDataNodes(0).build();
|
|
||||||
|
|
||||||
try {
|
|
||||||
cluster.waitActive();
|
|
||||||
cluster.transitionToActive(0);
|
|
||||||
|
|
||||||
final DistributedFileSystem dfs = HATestUtil.configureFailoverFs(
|
|
||||||
cluster, conf);
|
|
||||||
runTestAsyncWithoutRetry(conf, cluster, dfs);
|
|
||||||
|
|
||||||
final String renameDir = "/testAsyncWithHAFailover/";
|
|
||||||
final Path[] srcs = new Path[num + 1];
|
|
||||||
final Path[] dsts = new Path[num + 1];
|
|
||||||
mkdirs(dfs, renameDir, srcs, dsts);
|
|
||||||
|
|
||||||
// submit async calls and trigger failover in the middle.
|
|
||||||
final AsyncDistributedFileSystem adfs
|
|
||||||
= dfs.getAsyncDistributedFileSystem();
|
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(num + 1);
|
|
||||||
|
|
||||||
final List<Future<Void>> results = new ArrayList<>();
|
|
||||||
final List<IOException> exceptions = new ArrayList<>();
|
|
||||||
final List<Future<?>> futures = new ArrayList<>();
|
|
||||||
final int half = num/2;
|
|
||||||
for(int i = 0; i <= num; i++) {
|
|
||||||
final int id = i;
|
|
||||||
futures.add(executor.submit(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
if (id == half) {
|
|
||||||
// failover
|
|
||||||
cluster.shutdownNameNode(0);
|
|
||||||
cluster.transitionToActive(1);
|
|
||||||
} else {
|
|
||||||
// rename
|
|
||||||
results.add(adfs.rename(srcs[id], dsts[id]));
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
exceptions.add(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for the tasks
|
|
||||||
Assert.assertEquals(num + 1, futures.size());
|
|
||||||
for(int i = 0; i <= num; i++) {
|
|
||||||
futures.get(i).get();
|
|
||||||
}
|
|
||||||
// wait for the async calls
|
|
||||||
Assert.assertEquals(num, results.size());
|
|
||||||
Assert.assertTrue(exceptions.isEmpty());
|
|
||||||
for(Future<Void> r : results) {
|
|
||||||
r.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
// check results
|
|
||||||
for(int i = 0; i <= num; i++) {
|
|
||||||
final boolean renamed = i != half;
|
|
||||||
Assert.assertEquals(!renamed, dfs.exists(srcs[i]));
|
|
||||||
Assert.assertEquals(renamed, dfs.exists(dsts[i]));
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (cluster != null) {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -33,7 +33,6 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
@ -132,8 +131,7 @@ public CouldNotCatchUpException(String message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Gets the filesystem instance by setting the failover configurations */
|
/** Gets the filesystem instance by setting the failover configurations */
|
||||||
public static DistributedFileSystem configureFailoverFs(
|
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
|
||||||
MiniDFSCluster cluster, Configuration conf)
|
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
return configureFailoverFs(cluster, conf, 0);
|
return configureFailoverFs(cluster, conf, 0);
|
||||||
}
|
}
|
||||||
@ -145,14 +143,13 @@ public static DistributedFileSystem configureFailoverFs(
|
|||||||
* @param nsIndex namespace index starting with zero
|
* @param nsIndex namespace index starting with zero
|
||||||
* @throws IOException if an error occurs rolling the edit log
|
* @throws IOException if an error occurs rolling the edit log
|
||||||
*/
|
*/
|
||||||
public static DistributedFileSystem configureFailoverFs(
|
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
|
||||||
MiniDFSCluster cluster, Configuration conf,
|
|
||||||
int nsIndex) throws IOException, URISyntaxException {
|
int nsIndex) throws IOException, URISyntaxException {
|
||||||
conf = new Configuration(conf);
|
conf = new Configuration(conf);
|
||||||
String logicalName = getLogicalHostname(cluster);
|
String logicalName = getLogicalHostname(cluster);
|
||||||
setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
|
setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
|
||||||
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
|
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
|
||||||
return (DistributedFileSystem)fs;
|
return fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user