diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index a644aa5689b..ab8673b860e 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -345,13 +345,7 @@ - - - - - - - + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java deleted file mode 100644 index 5a03b034c3e..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.retry; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.AsyncGet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Method; -import java.util.LinkedList; -import java.util.Queue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -/** Handle async calls. */ -@InterfaceAudience.Private -public class AsyncCallHandler { - static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class); - - private static final ThreadLocal> - LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>(); - private static final ThreadLocal> - ASYNC_RETURN = new ThreadLocal<>(); - - /** @return the async return value from {@link AsyncCallHandler}. */ - @InterfaceStability.Unstable - @SuppressWarnings("unchecked") - public static AsyncGet getAsyncReturn() { - final AsyncGet asyncGet = (AsyncGet)ASYNC_RETURN.get(); - if (asyncGet != null) { - ASYNC_RETURN.set(null); - return asyncGet; - } else { - return (AsyncGet) getLowerLayerAsyncReturn(); - } - } - - /** For the lower rpc layers to set the async return value. */ - @InterfaceStability.Unstable - public static void setLowerLayerAsyncReturn( - AsyncGet asyncReturn) { - LOWER_LAYER_ASYNC_RETURN.set(asyncReturn); - } - - private static AsyncGet getLowerLayerAsyncReturn() { - final AsyncGet asyncGet = LOWER_LAYER_ASYNC_RETURN.get(); - Preconditions.checkNotNull(asyncGet); - LOWER_LAYER_ASYNC_RETURN.set(null); - return asyncGet; - } - - /** A simple concurrent queue which keeping track the empty start time. */ - static class ConcurrentQueue { - private final Queue queue = new LinkedList<>(); - private long emptyStartTime = Time.monotonicNow(); - - synchronized int size() { - return queue.size(); - } - - /** Is the queue empty for more than the given time in millisecond? */ - synchronized boolean isEmpty(long time) { - return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time; - } - - synchronized void offer(T c) { - final boolean added = queue.offer(c); - Preconditions.checkState(added); - } - - synchronized T poll() { - Preconditions.checkState(!queue.isEmpty()); - final T t = queue.poll(); - if (queue.isEmpty()) { - emptyStartTime = Time.monotonicNow(); - } - return t; - } - } - - /** A queue for handling async calls. */ - static class AsyncCallQueue { - private final ConcurrentQueue queue = new ConcurrentQueue<>(); - private final Processor processor = new Processor(); - - void addCall(AsyncCall call) { - if (LOG.isDebugEnabled()) { - LOG.debug("add " + call); - } - queue.offer(call); - processor.tryStart(); - } - - void checkCalls() { - final int size = queue.size(); - for (int i = 0; i < size; i++) { - final AsyncCall c = queue.poll(); - if (!c.isDone()) { - queue.offer(c); // the call is not done yet, add it back. - } - } - } - - /** Process the async calls in the queue. */ - private class Processor { - static final long GRACE_PERIOD = 10*1000L; - static final long SLEEP_PERIOD = 100L; - - private final AtomicReference running = new AtomicReference<>(); - - boolean isRunning(Daemon d) { - return d == running.get(); - } - - void tryStart() { - final Thread current = Thread.currentThread(); - if (running.compareAndSet(null, current)) { - final Daemon daemon = new Daemon() { - @Override - public void run() { - for (; isRunning(this);) { - try { - Thread.sleep(SLEEP_PERIOD); - } catch (InterruptedException e) { - kill(this); - return; - } - - checkCalls(); - tryStop(this); - } - } - }; - - final boolean set = running.compareAndSet(current, daemon); - Preconditions.checkState(set); - if (LOG.isDebugEnabled()) { - LOG.debug("Starting AsyncCallQueue.Processor " + daemon); - } - daemon.start(); - } - } - - void tryStop(Daemon d) { - if (queue.isEmpty(GRACE_PERIOD)) { - kill(d); - } - } - - void kill(Daemon d) { - if (LOG.isDebugEnabled()) { - LOG.debug("Killing " + d); - } - final boolean set = running.compareAndSet(d, null); - Preconditions.checkState(set); - } - } - } - - static class AsyncValue { - private V value; - - synchronized V waitAsyncValue(long timeout, TimeUnit unit) - throws InterruptedException, TimeoutException { - if (value != null) { - return value; - } - AsyncGet.Util.wait(this, timeout, unit); - if (value != null) { - return value; - } - - throw new TimeoutException("waitCallReturn timed out " - + timeout + " " + unit); - } - - synchronized void set(V v) { - Preconditions.checkNotNull(v); - Preconditions.checkState(value == null); - value = v; - notify(); - } - - synchronized boolean isDone() { - return value != null; - } - } - - static class AsyncCall extends RetryInvocationHandler.Call { - private final AsyncCallHandler asyncCallHandler; - - private final AsyncValue asyncCallReturn = new AsyncValue<>(); - private AsyncGet lowerLayerAsyncGet; - - AsyncCall(Method method, Object[] args, boolean isRpc, int callId, - RetryInvocationHandler.Counters counters, - RetryInvocationHandler retryInvocationHandler, - AsyncCallHandler asyncCallHandler) { - super(method, args, isRpc, callId, counters, retryInvocationHandler); - - this.asyncCallHandler = asyncCallHandler; - } - - /** @return true if the call is done; otherwise, return false. */ - boolean isDone() { - final CallReturn r = invokeOnce(); - switch (r.getState()) { - case RETURNED: - case EXCEPTION: - asyncCallReturn.set(r); // the async call is done - return true; - case RETRY: - invokeOnce(); - break; - case ASYNC_CALL_IN_PROGRESS: - case ASYNC_INVOKED: - // nothing to do - break; - default: - Preconditions.checkState(false); - } - return false; - } - - @Override - CallReturn invoke() throws Throwable { - LOG.debug("{}.invoke {}", getClass().getSimpleName(), this); - if (lowerLayerAsyncGet != null) { - // async call was submitted early, check the lower level async call - final boolean isDone = lowerLayerAsyncGet.isDone(); - LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone); - if (!isDone) { - return CallReturn.ASYNC_CALL_IN_PROGRESS; - } - try { - return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS)); - } finally { - lowerLayerAsyncGet = null; - } - } - - // submit a new async call - LOG.trace("invoke: ASYNC_INVOKED"); - final boolean mode = Client.isAsynchronousMode(); - try { - Client.setAsynchronousMode(true); - final Object r = invokeMethod(); - // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null. - Preconditions.checkState(r == null); - lowerLayerAsyncGet = getLowerLayerAsyncReturn(); - - if (counters.isZeros()) { - // first async attempt, initialize - LOG.trace("invoke: initAsyncCall"); - asyncCallHandler.initAsyncCall(this, asyncCallReturn); - } - return CallReturn.ASYNC_INVOKED; - } finally { - Client.setAsynchronousMode(mode); - } - } - } - - private final AsyncCallQueue asyncCalls = new AsyncCallQueue(); - private volatile boolean hasSuccessfulCall = false; - - AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc, - int callId, RetryInvocationHandler.Counters counters, - RetryInvocationHandler retryInvocationHandler) { - return new AsyncCall(method, args, isRpc, callId, counters, - retryInvocationHandler, this); - } - - boolean hasSuccessfulCall() { - return hasSuccessfulCall; - } - - private void initAsyncCall(final AsyncCall asyncCall, - final AsyncValue asyncCallReturn) { - asyncCalls.addCall(asyncCall); - - final AsyncGet asyncGet - = new AsyncGet() { - @Override - public Object get(long timeout, TimeUnit unit) throws Throwable { - final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit); - final Object r = c.getReturnValue(); - hasSuccessfulCall = true; - return r; - } - - @Override - public boolean isDone() { - return asyncCallReturn.isDone(); - } - }; - ASYNC_RETURN.set(asyncGet); - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java deleted file mode 100644 index 943725c64f4..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.retry; - -import com.google.common.base.Preconditions; - -/** The call return from a method invocation. */ -class CallReturn { - /** The return state. */ - enum State { - /** Call is returned successfully. */ - RETURNED, - /** Call throws an exception. */ - EXCEPTION, - /** Call should be retried according to the {@link RetryPolicy}. */ - RETRY, - /** Call, which is async, is still in progress. */ - ASYNC_CALL_IN_PROGRESS, - /** Call, which is async, just has been invoked. */ - ASYNC_INVOKED - } - - static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn( - State.ASYNC_CALL_IN_PROGRESS); - static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED); - static final CallReturn RETRY = new CallReturn(State.RETRY); - - private final Object returnValue; - private final Throwable thrown; - private final State state; - - CallReturn(Object r) { - this(r, null, State.RETURNED); - } - CallReturn(Throwable t) { - this(null, t, State.EXCEPTION); - Preconditions.checkNotNull(t); - } - private CallReturn(State s) { - this(null, null, s); - } - private CallReturn(Object r, Throwable t, State s) { - Preconditions.checkArgument(r == null || t == null); - returnValue = r; - thrown = t; - state = s; - } - - State getState() { - return state; - } - - Object getReturnValue() throws Throwable { - if (state == State.EXCEPTION) { - throw thrown; - } - Preconditions.checkState(state == State.RETURNED, "state == %s", state); - return returnValue; - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index f2b2c9981a7..300d0c2ab5b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -42,83 +42,11 @@ import java.util.Map; public class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); - static class Call { - private final Method method; - private final Object[] args; - private final boolean isRpc; - private final int callId; - final Counters counters; - - private final RetryPolicy retryPolicy; - private final RetryInvocationHandler retryInvocationHandler; - - Call(Method method, Object[] args, boolean isRpc, int callId, - Counters counters, RetryInvocationHandler retryInvocationHandler) { - this.method = method; - this.args = args; - this.isRpc = isRpc; - this.callId = callId; - this.counters = counters; - - this.retryPolicy = retryInvocationHandler.getRetryPolicy(method); - this.retryInvocationHandler = retryInvocationHandler; - } - - /** Invoke the call once without retrying. */ - synchronized CallReturn invokeOnce() { - try { - // The number of times this invocation handler has ever been failed over - // before this method invocation attempt. Used to prevent concurrent - // failed method invocations from triggering multiple failover attempts. - final long failoverCount = retryInvocationHandler.getFailoverCount(); - try { - return invoke(); - } catch (Exception e) { - if (LOG.isTraceEnabled()) { - LOG.trace(this, e); - } - if (Thread.currentThread().isInterrupted()) { - // If interrupted, do not retry. - throw e; - } - retryInvocationHandler.handleException( - method, retryPolicy, failoverCount, counters, e); - return CallReturn.RETRY; - } - } catch(Throwable t) { - return new CallReturn(t); - } - } - - CallReturn invoke() throws Throwable { - return new CallReturn(invokeMethod()); - } - - Object invokeMethod() throws Throwable { - if (isRpc) { - Client.setCallIdAndRetryCount(callId, counters.retries); - } - return retryInvocationHandler.invokeMethod(method, args); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "#" + callId + ": " - + method.getDeclaringClass().getSimpleName() + "." + method.getName() - + "(" + (args == null || args.length == 0? "": Arrays.toString(args)) - + ")"; - } - } - - static class Counters { + private static class Counters { /** Counter for retries. */ private int retries; /** Counter for method invocation has been failed over. */ private int failovers; - - boolean isZeros() { - return retries == 0 && failovers == 0; - } } private static class ProxyDescriptor { @@ -216,13 +144,11 @@ public class RetryInvocationHandler implements RpcInvocationHandler { private final ProxyDescriptor proxyDescriptor; - private volatile boolean hasSuccessfulCall = false; - + private volatile boolean hasMadeASuccessfulCall = false; + private final RetryPolicy defaultPolicy; private final Map methodNameToPolicyMap; - private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler(); - protected RetryInvocationHandler(FailoverProxyProvider proxyProvider, RetryPolicy retryPolicy) { this(proxyProvider, retryPolicy, Collections.emptyMap()); @@ -241,35 +167,38 @@ public class RetryInvocationHandler implements RpcInvocationHandler { return policy != null? policy: defaultPolicy; } - private long getFailoverCount() { - return proxyDescriptor.getFailoverCount(); - } - - private Call newCall(Method method, Object[] args, boolean isRpc, int callId, - Counters counters) { - if (Client.isAsynchronousMode()) { - return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, - counters, this); - } else { - return new Call(method, args, isRpc, callId, counters, this); - } - } - @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy()); final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID; - final Counters counters = new Counters(); + return invoke(method, args, isRpc, callId, new Counters()); + } + + private Object invoke(final Method method, final Object[] args, + final boolean isRpc, final int callId, final Counters counters) + throws Throwable { + final RetryPolicy policy = getRetryPolicy(method); - final Call call = newCall(method, args, isRpc, callId, counters); while (true) { - final CallReturn c = call.invokeOnce(); - final CallReturn.State state = c.getState(); - if (state == CallReturn.State.ASYNC_INVOKED) { - return null; // return null for async calls - } else if (c.getState() != CallReturn.State.RETRY) { - return c.getReturnValue(); + // The number of times this invocation handler has ever been failed over, + // before this method invocation attempt. Used to prevent concurrent + // failed method invocations from triggering multiple failover attempts. + final long failoverCount = proxyDescriptor.getFailoverCount(); + + if (isRpc) { + Client.setCallIdAndRetryCount(callId, counters.retries); + } + try { + final Object ret = invokeMethod(method, args); + hasMadeASuccessfulCall = true; + return ret; + } catch (Exception ex) { + if (Thread.currentThread().isInterrupted()) { + // If interrupted, do not retry. + throw ex; + } + handleException(method, policy, failoverCount, counters, ex); } } } @@ -310,8 +239,7 @@ public class RetryInvocationHandler implements RpcInvocationHandler { final int failovers, final long delay, final Exception ex) { // log info if this has made some successful calls or // this is not the first failover - final boolean info = hasSuccessfulCall || failovers != 0 - || asyncCallHandler.hasSuccessfulCall(); + final boolean info = hasMadeASuccessfulCall || failovers != 0; if (!info && !LOG.isDebugEnabled()) { return; } @@ -337,9 +265,7 @@ public class RetryInvocationHandler implements RpcInvocationHandler { if (!method.isAccessible()) { method.setAccessible(true); } - final Object r = method.invoke(proxyDescriptor.getProxy(), args); - hasSuccessfulCall = true; - return r; + return method.invoke(proxyDescriptor.getProxy(), args); } catch (InvocationTargetException e) { throw e.getCause(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index c0a14b70512..131aa8feb80 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.io.retry; -import java.io.EOFException; import java.io.IOException; import java.net.ConnectException; import java.net.NoRouteToHostException; @@ -648,9 +647,8 @@ public class RetryPolicies { return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "retries (" + retries + ") exceeded maximum allowed (" + maxRetries + ")"); } - + if (e instanceof ConnectException || - e instanceof EOFException || e instanceof NoRouteToHostException || e instanceof UnknownHostException || e instanceof StandbyException || diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index ed8d905a963..d1d5b17c6fb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -58,6 +58,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.AsyncGet; +import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.apache.htrace.core.Span; import org.apache.htrace.core.Tracer; @@ -93,8 +94,8 @@ public class Client implements AutoCloseable { private static final ThreadLocal callId = new ThreadLocal(); private static final ThreadLocal retryCount = new ThreadLocal(); - private static final ThreadLocal> - ASYNC_RPC_RESPONSE = new ThreadLocal<>(); + private static final ThreadLocal> ASYNC_RPC_RESPONSE + = new ThreadLocal<>(); private static final ThreadLocal asynchronousMode = new ThreadLocal() { @Override @@ -105,9 +106,8 @@ public class Client implements AutoCloseable { @SuppressWarnings("unchecked") @Unstable - public static AsyncGet - getAsyncRpcResponse() { - return (AsyncGet) ASYNC_RPC_RESPONSE.get(); + public static Future getAsyncRpcResponse() { + return (Future) ASYNC_RPC_RESPONSE.get(); } /** Set call id and retry count for the next call. */ @@ -1413,16 +1413,9 @@ public class Client implements AutoCloseable { } } } - - @Override - public boolean isDone() { - synchronized (call) { - return call.done; - } - } }; - ASYNC_RPC_RESPONSE.set(asyncGet); + ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet)); return null; } else { return getRpcResponse(call, connection, -1, null); @@ -1467,8 +1460,10 @@ public class Client implements AutoCloseable { synchronized (call) { while (!call.done) { try { - AsyncGet.Util.wait(call, timeout, unit); - if (timeout >= 0 && !call.done) { + final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout( + timeout, unit); + call.wait(waitTimeout); // wait for the result + if (waitTimeout > 0 && !call.done) { return null; } } catch (InterruptedException ie) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 315ec67decb..0f43fc6d3d1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -54,6 +54,7 @@ import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -255,18 +256,14 @@ public class ProtobufRpcEngine implements RpcEngine { } if (Client.isAsynchronousMode()) { - final AsyncGet arr - = Client.getAsyncRpcResponse(); + final Future frrw = Client.getAsyncRpcResponse(); final AsyncGet asyncGet = new AsyncGet() { @Override public Message get(long timeout, TimeUnit unit) throws Exception { - return getReturnMessage(method, arr.get(timeout, unit)); - } - - @Override - public boolean isDone() { - return arr.isDone(); + final RpcResponseWrapper rrw = timeout < 0? + frrw.get(): frrw.get(timeout, unit); + return getReturnMessage(method, rrw); } }; ASYNC_RETURN_MESSAGE.set(asyncGet); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java index f124890dd53..5eac869632e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java @@ -47,19 +47,14 @@ public interface AsyncGet { R get(long timeout, TimeUnit unit) throws E, TimeoutException, InterruptedException; - /** @return true if the underlying computation is done; false, otherwise. */ - boolean isDone(); - /** Utility */ class Util { - /** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */ - public static void wait(Object obj, long timeout, TimeUnit unit) - throws InterruptedException { - if (timeout < 0) { - obj.wait(); - } else if (timeout > 0) { - obj.wait(unit.toMillis(timeout)); - } + /** + * @return {@link Object#wait(long)} timeout converted + * from {@link #get(long, TimeUnit)} timeout. + */ + public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){ + return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java index 4450c0ccbf1..0ad191b4fee 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java @@ -30,7 +30,6 @@ import org.apache.hadoop.ipc.TestIPC.TestServer; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -51,11 +50,6 @@ public class TestAsyncIPC { private static Configuration conf; private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class); - static AsyncGetFuture - getAsyncRpcResponseFuture() { - return new AsyncGetFuture<>(Client.getAsyncRpcResponse()); - } - @Before public void setupConf() { conf = new Configuration(); @@ -90,7 +84,7 @@ public class TestAsyncIPC { try { final long param = TestIPC.RANDOM.nextLong(); TestIPC.call(client, param, server, conf); - returnFutures.put(i, getAsyncRpcResponseFuture()); + returnFutures.put(i, Client.getAsyncRpcResponse()); expectedValues.put(i, param); } catch (Exception e) { failed = true; @@ -210,7 +204,7 @@ public class TestAsyncIPC { private void doCall(final int idx, final long param) throws IOException { TestIPC.call(client, param, server, conf); - returnFutures.put(idx, getAsyncRpcResponseFuture()); + returnFutures.put(idx, Client.getAsyncRpcResponse()); expectedValues.put(idx, param); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java index 824336a2922..29bac2ac4af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; -import org.apache.hadoop.io.retry.AsyncCallHandler; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.apache.hadoop.ipc.Client; @@ -51,8 +51,9 @@ public class AsyncDistributedFileSystem { this.dfs = dfs; } - private static Future getReturnValue() { - return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn()); + static Future getReturnValue() { + return new AsyncGetFuture<>( + ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index bcf5269489b..2373da786d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; @@ -183,7 +184,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.retry.AsyncCallHandler; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -212,6 +212,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet; public class ClientNamenodeProtocolTranslatorPB implements ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator { final private ClientNamenodeProtocolPB rpcProxy; + private static final ThreadLocal> + ASYNC_RETURN_VALUE = new ThreadLocal<>(); static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST = GetServerDefaultsRequestProto.newBuilder().build(); @@ -245,6 +247,12 @@ public class ClientNamenodeProtocolTranslatorPB implements rpcProxy = proxy; } + @SuppressWarnings("unchecked") + @Unstable + public static AsyncGet getAsyncReturnValue() { + return (AsyncGet) ASYNC_RETURN_VALUE.get(); + } + @Override public void close() { RPC.stopProxy(rpcProxy); @@ -383,13 +391,8 @@ public class ClientNamenodeProtocolTranslatorPB implements asyncReturnMessage.get(timeout, unit); return null; } - - @Override - public boolean isDone() { - return asyncReturnMessage.isDone(); - } }; - AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet); + ASYNC_RETURN_VALUE.set(asyncGet); } @Override @@ -1364,20 +1367,17 @@ public class ClientNamenodeProtocolTranslatorPB implements rpcProxy.getAclStatus(null, req); final AsyncGet asyncReturnMessage = ProtobufRpcEngine.getAsyncReturnMessage(); - final AsyncGet asyncGet - = new AsyncGet() { - @Override - public AclStatus get(long timeout, TimeUnit unit) throws Exception { - return PBHelperClient.convert((GetAclStatusResponseProto) - asyncReturnMessage.get(timeout, unit)); - } - - @Override - public boolean isDone() { - return asyncReturnMessage.isDone(); - } - }; - AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet); + final AsyncGet asyncGet = + new AsyncGet() { + @Override + public AclStatus get(long timeout, TimeUnit unit) + throws Exception { + return PBHelperClient + .convert((GetAclStatusResponseProto) asyncReturnMessage + .get(timeout, unit)); + } + }; + ASYNC_RETURN_VALUE.set(asyncGet); return null; } else { return PBHelperClient.convert(rpcProxy.getAclStatus(null, req)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java index 6a602900b2d..c7615a94559 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator; import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers; import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest; import org.apache.hadoop.ipc.AsyncCallLimitExceededException; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.junit.After; @@ -71,7 +70,7 @@ public class TestAsyncDFS { public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class); private final short replFactor = 1; private final long blockSize = 512; - private long fileLen = 0; + private long fileLen = blockSize * 3; private final long seed = Time.now(); private final Random r = new Random(seed); private final PermissionGenerator permGenerator = new PermissionGenerator(r); @@ -81,7 +80,7 @@ public class TestAsyncDFS { private Configuration conf; private MiniDFSCluster cluster; - private DistributedFileSystem fs; + private FileSystem fs; private AsyncDistributedFileSystem adfs; @Before @@ -96,10 +95,10 @@ public class TestAsyncDFS { ASYNC_CALL_LIMIT); // set server handlers conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); - fs = cluster.getFileSystem(); - adfs = fs.getAsyncDistributedFileSystem(); + fs = FileSystem.get(conf); + adfs = cluster.getFileSystem().getAsyncDistributedFileSystem(); } @After @@ -114,6 +113,31 @@ public class TestAsyncDFS { } } + static class AclQueueEntry { + private final Object future; + private final Path path; + private final Boolean isSetAcl; + + AclQueueEntry(final Object future, final Path path, + final Boolean isSetAcl) { + this.future = future; + this.path = path; + this.isSetAcl = isSetAcl; + } + + public final Object getFuture() { + return future; + } + + public final Path getPath() { + return path; + } + + public final Boolean isSetAcl() { + return this.isSetAcl; + } + } + @Test(timeout=60000) public void testBatchAsyncAcl() throws Exception { final String basePath = "testBatchAsyncAcl"; @@ -324,7 +348,7 @@ public class TestAsyncDFS { public static void checkPermissionDenied(final Exception e, final Path dir, final String user) { - assertTrue(e.getCause() instanceof RemoteException); + assertTrue(e.getCause() instanceof ExecutionException); assertTrue("Permission denied messages must carry AccessControlException", e.getMessage().contains("AccessControlException")); assertTrue("Permission denied messages must carry the username", e @@ -446,9 +470,4 @@ public class TestAsyncDFS { assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup())); } } - - @Test - public void testAsyncWithoutRetry() throws Exception { - TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java deleted file mode 100644 index 9ade8ec31f6..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; -import org.apache.hadoop.io.retry.AsyncCallHandler; -import org.apache.hadoop.io.retry.RetryInvocationHandler; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.concurrent.AsyncGetFuture; -import org.apache.log4j.Level; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -/** Test async methods with HA setup. */ -public class TestAsyncHDFSWithHA { - static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class); - static { - GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL); - } - - private static Future getReturnValue() { - return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn()); - } - - static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs, - Path[] dsts) throws IOException { - for (int i = 0; i < srcs.length; i++) { - srcs[i] = new Path(dir, "src" + i); - dsts[i] = new Path(dir, "dst" + i); - dfs.mkdirs(srcs[i]); - } - } - - static void runTestAsyncWithoutRetry(Configuration conf, - MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception { - final int num = 5; - - final String renameDir = "/testAsyncWithoutRetry/"; - final Path[] srcs = new Path[num + 1]; - final Path[] dsts = new Path[num + 1]; - mkdirs(dfs, renameDir, srcs, dsts); - - // create a proxy without retry. - final NameNodeProxiesClient.ProxyAndInfo proxyInfo - = NameNodeProxies.createNonHAProxy(conf, - cluster.getNameNode(0).getNameNodeAddress(), - ClientProtocol.class, UserGroupInformation.getCurrentUser(), - false); - final ClientProtocol cp = proxyInfo.getProxy(); - - // submit async calls - Client.setAsynchronousMode(true); - final List> results = new ArrayList<>(); - for (int i = 0; i < num; i++) { - final String src = srcs[i].toString(); - final String dst = dsts[i].toString(); - LOG.info(i + ") rename " + src + " -> " + dst); - cp.rename2(src, dst); - results.add(getReturnValue()); - } - Client.setAsynchronousMode(false); - - // wait for the async calls - for (Future f : results) { - f.get(); - } - - //check results - for (int i = 0; i < num; i++) { - Assert.assertEquals(false, dfs.exists(srcs[i])); - Assert.assertEquals(true, dfs.exists(dsts[i])); - } - } - - /** Testing HDFS async methods with HA setup. */ - @Test(timeout = 120000) - public void testAsyncWithHAFailover() throws Exception { - final int num = 10; - - final Configuration conf = new HdfsConfiguration(); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(0).build(); - - try { - cluster.waitActive(); - cluster.transitionToActive(0); - - final DistributedFileSystem dfs = HATestUtil.configureFailoverFs( - cluster, conf); - runTestAsyncWithoutRetry(conf, cluster, dfs); - - final String renameDir = "/testAsyncWithHAFailover/"; - final Path[] srcs = new Path[num + 1]; - final Path[] dsts = new Path[num + 1]; - mkdirs(dfs, renameDir, srcs, dsts); - - // submit async calls and trigger failover in the middle. - final AsyncDistributedFileSystem adfs - = dfs.getAsyncDistributedFileSystem(); - final ExecutorService executor = Executors.newFixedThreadPool(num + 1); - - final List> results = new ArrayList<>(); - final List exceptions = new ArrayList<>(); - final List> futures = new ArrayList<>(); - final int half = num/2; - for(int i = 0; i <= num; i++) { - final int id = i; - futures.add(executor.submit(new Runnable() { - @Override - public void run() { - try { - if (id == half) { - // failover - cluster.shutdownNameNode(0); - cluster.transitionToActive(1); - } else { - // rename - results.add(adfs.rename(srcs[id], dsts[id])); - } - } catch (IOException e) { - exceptions.add(e); - } - } - })); - } - - // wait for the tasks - Assert.assertEquals(num + 1, futures.size()); - for(int i = 0; i <= num; i++) { - futures.get(i).get(); - } - // wait for the async calls - Assert.assertEquals(num, results.size()); - Assert.assertTrue(exceptions.isEmpty()); - for(Future r : results) { - r.get(); - } - - // check results - for(int i = 0; i <= num; i++) { - final boolean renamed = i != half; - Assert.assertEquals(!renamed, dfs.exists(srcs[i])); - Assert.assertEquals(renamed, dfs.exists(dsts[i])); - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 169bbeef119..42cf3d4aa85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -38,7 +38,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -136,8 +135,7 @@ public abstract class HATestUtil { } /** Gets the filesystem instance by setting the failover configurations */ - public static DistributedFileSystem configureFailoverFs( - MiniDFSCluster cluster, Configuration conf) + public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf) throws IOException, URISyntaxException { return configureFailoverFs(cluster, conf, 0); } @@ -149,14 +147,13 @@ public abstract class HATestUtil { * @param nsIndex namespace index starting with zero * @throws IOException if an error occurs rolling the edit log */ - public static DistributedFileSystem configureFailoverFs( - MiniDFSCluster cluster, Configuration conf, + public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf, int nsIndex) throws IOException, URISyntaxException { conf = new Configuration(conf); String logicalName = getLogicalHostname(cluster); setFailoverConfigurations(cluster, conf, logicalName, nsIndex); FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); - return (DistributedFileSystem)fs; + return fs; } public static void setFailoverConfigurations(MiniDFSCluster cluster,