diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index ab8673b860e..a644aa5689b 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -345,7 +345,13 @@ - + + + + + + + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java new file mode 100644 index 00000000000..5a03b034c3e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.retry; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.AsyncGet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +/** Handle async calls. */ +@InterfaceAudience.Private +public class AsyncCallHandler { + static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class); + + private static final ThreadLocal> + LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>(); + private static final ThreadLocal> + ASYNC_RETURN = new ThreadLocal<>(); + + /** @return the async return value from {@link AsyncCallHandler}. */ + @InterfaceStability.Unstable + @SuppressWarnings("unchecked") + public static AsyncGet getAsyncReturn() { + final AsyncGet asyncGet = (AsyncGet)ASYNC_RETURN.get(); + if (asyncGet != null) { + ASYNC_RETURN.set(null); + return asyncGet; + } else { + return (AsyncGet) getLowerLayerAsyncReturn(); + } + } + + /** For the lower rpc layers to set the async return value. */ + @InterfaceStability.Unstable + public static void setLowerLayerAsyncReturn( + AsyncGet asyncReturn) { + LOWER_LAYER_ASYNC_RETURN.set(asyncReturn); + } + + private static AsyncGet getLowerLayerAsyncReturn() { + final AsyncGet asyncGet = LOWER_LAYER_ASYNC_RETURN.get(); + Preconditions.checkNotNull(asyncGet); + LOWER_LAYER_ASYNC_RETURN.set(null); + return asyncGet; + } + + /** A simple concurrent queue which keeping track the empty start time. */ + static class ConcurrentQueue { + private final Queue queue = new LinkedList<>(); + private long emptyStartTime = Time.monotonicNow(); + + synchronized int size() { + return queue.size(); + } + + /** Is the queue empty for more than the given time in millisecond? */ + synchronized boolean isEmpty(long time) { + return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time; + } + + synchronized void offer(T c) { + final boolean added = queue.offer(c); + Preconditions.checkState(added); + } + + synchronized T poll() { + Preconditions.checkState(!queue.isEmpty()); + final T t = queue.poll(); + if (queue.isEmpty()) { + emptyStartTime = Time.monotonicNow(); + } + return t; + } + } + + /** A queue for handling async calls. */ + static class AsyncCallQueue { + private final ConcurrentQueue queue = new ConcurrentQueue<>(); + private final Processor processor = new Processor(); + + void addCall(AsyncCall call) { + if (LOG.isDebugEnabled()) { + LOG.debug("add " + call); + } + queue.offer(call); + processor.tryStart(); + } + + void checkCalls() { + final int size = queue.size(); + for (int i = 0; i < size; i++) { + final AsyncCall c = queue.poll(); + if (!c.isDone()) { + queue.offer(c); // the call is not done yet, add it back. + } + } + } + + /** Process the async calls in the queue. */ + private class Processor { + static final long GRACE_PERIOD = 10*1000L; + static final long SLEEP_PERIOD = 100L; + + private final AtomicReference running = new AtomicReference<>(); + + boolean isRunning(Daemon d) { + return d == running.get(); + } + + void tryStart() { + final Thread current = Thread.currentThread(); + if (running.compareAndSet(null, current)) { + final Daemon daemon = new Daemon() { + @Override + public void run() { + for (; isRunning(this);) { + try { + Thread.sleep(SLEEP_PERIOD); + } catch (InterruptedException e) { + kill(this); + return; + } + + checkCalls(); + tryStop(this); + } + } + }; + + final boolean set = running.compareAndSet(current, daemon); + Preconditions.checkState(set); + if (LOG.isDebugEnabled()) { + LOG.debug("Starting AsyncCallQueue.Processor " + daemon); + } + daemon.start(); + } + } + + void tryStop(Daemon d) { + if (queue.isEmpty(GRACE_PERIOD)) { + kill(d); + } + } + + void kill(Daemon d) { + if (LOG.isDebugEnabled()) { + LOG.debug("Killing " + d); + } + final boolean set = running.compareAndSet(d, null); + Preconditions.checkState(set); + } + } + } + + static class AsyncValue { + private V value; + + synchronized V waitAsyncValue(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException { + if (value != null) { + return value; + } + AsyncGet.Util.wait(this, timeout, unit); + if (value != null) { + return value; + } + + throw new TimeoutException("waitCallReturn timed out " + + timeout + " " + unit); + } + + synchronized void set(V v) { + Preconditions.checkNotNull(v); + Preconditions.checkState(value == null); + value = v; + notify(); + } + + synchronized boolean isDone() { + return value != null; + } + } + + static class AsyncCall extends RetryInvocationHandler.Call { + private final AsyncCallHandler asyncCallHandler; + + private final AsyncValue asyncCallReturn = new AsyncValue<>(); + private AsyncGet lowerLayerAsyncGet; + + AsyncCall(Method method, Object[] args, boolean isRpc, int callId, + RetryInvocationHandler.Counters counters, + RetryInvocationHandler retryInvocationHandler, + AsyncCallHandler asyncCallHandler) { + super(method, args, isRpc, callId, counters, retryInvocationHandler); + + this.asyncCallHandler = asyncCallHandler; + } + + /** @return true if the call is done; otherwise, return false. */ + boolean isDone() { + final CallReturn r = invokeOnce(); + switch (r.getState()) { + case RETURNED: + case EXCEPTION: + asyncCallReturn.set(r); // the async call is done + return true; + case RETRY: + invokeOnce(); + break; + case ASYNC_CALL_IN_PROGRESS: + case ASYNC_INVOKED: + // nothing to do + break; + default: + Preconditions.checkState(false); + } + return false; + } + + @Override + CallReturn invoke() throws Throwable { + LOG.debug("{}.invoke {}", getClass().getSimpleName(), this); + if (lowerLayerAsyncGet != null) { + // async call was submitted early, check the lower level async call + final boolean isDone = lowerLayerAsyncGet.isDone(); + LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone); + if (!isDone) { + return CallReturn.ASYNC_CALL_IN_PROGRESS; + } + try { + return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS)); + } finally { + lowerLayerAsyncGet = null; + } + } + + // submit a new async call + LOG.trace("invoke: ASYNC_INVOKED"); + final boolean mode = Client.isAsynchronousMode(); + try { + Client.setAsynchronousMode(true); + final Object r = invokeMethod(); + // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null. + Preconditions.checkState(r == null); + lowerLayerAsyncGet = getLowerLayerAsyncReturn(); + + if (counters.isZeros()) { + // first async attempt, initialize + LOG.trace("invoke: initAsyncCall"); + asyncCallHandler.initAsyncCall(this, asyncCallReturn); + } + return CallReturn.ASYNC_INVOKED; + } finally { + Client.setAsynchronousMode(mode); + } + } + } + + private final AsyncCallQueue asyncCalls = new AsyncCallQueue(); + private volatile boolean hasSuccessfulCall = false; + + AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc, + int callId, RetryInvocationHandler.Counters counters, + RetryInvocationHandler retryInvocationHandler) { + return new AsyncCall(method, args, isRpc, callId, counters, + retryInvocationHandler, this); + } + + boolean hasSuccessfulCall() { + return hasSuccessfulCall; + } + + private void initAsyncCall(final AsyncCall asyncCall, + final AsyncValue asyncCallReturn) { + asyncCalls.addCall(asyncCall); + + final AsyncGet asyncGet + = new AsyncGet() { + @Override + public Object get(long timeout, TimeUnit unit) throws Throwable { + final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit); + final Object r = c.getReturnValue(); + hasSuccessfulCall = true; + return r; + } + + @Override + public boolean isDone() { + return asyncCallReturn.isDone(); + } + }; + ASYNC_RETURN.set(asyncGet); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java new file mode 100644 index 00000000000..943725c64f4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.retry; + +import com.google.common.base.Preconditions; + +/** The call return from a method invocation. */ +class CallReturn { + /** The return state. */ + enum State { + /** Call is returned successfully. */ + RETURNED, + /** Call throws an exception. */ + EXCEPTION, + /** Call should be retried according to the {@link RetryPolicy}. */ + RETRY, + /** Call, which is async, is still in progress. */ + ASYNC_CALL_IN_PROGRESS, + /** Call, which is async, just has been invoked. */ + ASYNC_INVOKED + } + + static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn( + State.ASYNC_CALL_IN_PROGRESS); + static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED); + static final CallReturn RETRY = new CallReturn(State.RETRY); + + private final Object returnValue; + private final Throwable thrown; + private final State state; + + CallReturn(Object r) { + this(r, null, State.RETURNED); + } + CallReturn(Throwable t) { + this(null, t, State.EXCEPTION); + Preconditions.checkNotNull(t); + } + private CallReturn(State s) { + this(null, null, s); + } + private CallReturn(Object r, Throwable t, State s) { + Preconditions.checkArgument(r == null || t == null); + returnValue = r; + thrown = t; + state = s; + } + + State getState() { + return state; + } + + Object getReturnValue() throws Throwable { + if (state == State.EXCEPTION) { + throw thrown; + } + Preconditions.checkState(state == State.RETURNED, "state == %s", state); + return returnValue; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 300d0c2ab5b..f2b2c9981a7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -42,11 +42,83 @@ import java.util.Map; public class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); - private static class Counters { + static class Call { + private final Method method; + private final Object[] args; + private final boolean isRpc; + private final int callId; + final Counters counters; + + private final RetryPolicy retryPolicy; + private final RetryInvocationHandler retryInvocationHandler; + + Call(Method method, Object[] args, boolean isRpc, int callId, + Counters counters, RetryInvocationHandler retryInvocationHandler) { + this.method = method; + this.args = args; + this.isRpc = isRpc; + this.callId = callId; + this.counters = counters; + + this.retryPolicy = retryInvocationHandler.getRetryPolicy(method); + this.retryInvocationHandler = retryInvocationHandler; + } + + /** Invoke the call once without retrying. */ + synchronized CallReturn invokeOnce() { + try { + // The number of times this invocation handler has ever been failed over + // before this method invocation attempt. Used to prevent concurrent + // failed method invocations from triggering multiple failover attempts. + final long failoverCount = retryInvocationHandler.getFailoverCount(); + try { + return invoke(); + } catch (Exception e) { + if (LOG.isTraceEnabled()) { + LOG.trace(this, e); + } + if (Thread.currentThread().isInterrupted()) { + // If interrupted, do not retry. + throw e; + } + retryInvocationHandler.handleException( + method, retryPolicy, failoverCount, counters, e); + return CallReturn.RETRY; + } + } catch(Throwable t) { + return new CallReturn(t); + } + } + + CallReturn invoke() throws Throwable { + return new CallReturn(invokeMethod()); + } + + Object invokeMethod() throws Throwable { + if (isRpc) { + Client.setCallIdAndRetryCount(callId, counters.retries); + } + return retryInvocationHandler.invokeMethod(method, args); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "#" + callId + ": " + + method.getDeclaringClass().getSimpleName() + "." + method.getName() + + "(" + (args == null || args.length == 0? "": Arrays.toString(args)) + + ")"; + } + } + + static class Counters { /** Counter for retries. */ private int retries; /** Counter for method invocation has been failed over. */ private int failovers; + + boolean isZeros() { + return retries == 0 && failovers == 0; + } } private static class ProxyDescriptor { @@ -144,11 +216,13 @@ public class RetryInvocationHandler implements RpcInvocationHandler { private final ProxyDescriptor proxyDescriptor; - private volatile boolean hasMadeASuccessfulCall = false; - + private volatile boolean hasSuccessfulCall = false; + private final RetryPolicy defaultPolicy; private final Map methodNameToPolicyMap; + private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler(); + protected RetryInvocationHandler(FailoverProxyProvider proxyProvider, RetryPolicy retryPolicy) { this(proxyProvider, retryPolicy, Collections.emptyMap()); @@ -167,38 +241,35 @@ public class RetryInvocationHandler implements RpcInvocationHandler { return policy != null? policy: defaultPolicy; } + private long getFailoverCount() { + return proxyDescriptor.getFailoverCount(); + } + + private Call newCall(Method method, Object[] args, boolean isRpc, int callId, + Counters counters) { + if (Client.isAsynchronousMode()) { + return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, + counters, this); + } else { + return new Call(method, args, isRpc, callId, counters, this); + } + } + @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy()); final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID; - return invoke(method, args, isRpc, callId, new Counters()); - } - - private Object invoke(final Method method, final Object[] args, - final boolean isRpc, final int callId, final Counters counters) - throws Throwable { - final RetryPolicy policy = getRetryPolicy(method); + final Counters counters = new Counters(); + final Call call = newCall(method, args, isRpc, callId, counters); while (true) { - // The number of times this invocation handler has ever been failed over, - // before this method invocation attempt. Used to prevent concurrent - // failed method invocations from triggering multiple failover attempts. - final long failoverCount = proxyDescriptor.getFailoverCount(); - - if (isRpc) { - Client.setCallIdAndRetryCount(callId, counters.retries); - } - try { - final Object ret = invokeMethod(method, args); - hasMadeASuccessfulCall = true; - return ret; - } catch (Exception ex) { - if (Thread.currentThread().isInterrupted()) { - // If interrupted, do not retry. - throw ex; - } - handleException(method, policy, failoverCount, counters, ex); + final CallReturn c = call.invokeOnce(); + final CallReturn.State state = c.getState(); + if (state == CallReturn.State.ASYNC_INVOKED) { + return null; // return null for async calls + } else if (c.getState() != CallReturn.State.RETRY) { + return c.getReturnValue(); } } } @@ -239,7 +310,8 @@ public class RetryInvocationHandler implements RpcInvocationHandler { final int failovers, final long delay, final Exception ex) { // log info if this has made some successful calls or // this is not the first failover - final boolean info = hasMadeASuccessfulCall || failovers != 0; + final boolean info = hasSuccessfulCall || failovers != 0 + || asyncCallHandler.hasSuccessfulCall(); if (!info && !LOG.isDebugEnabled()) { return; } @@ -265,7 +337,9 @@ public class RetryInvocationHandler implements RpcInvocationHandler { if (!method.isAccessible()) { method.setAccessible(true); } - return method.invoke(proxyDescriptor.getProxy(), args); + final Object r = method.invoke(proxyDescriptor.getProxy(), args); + hasSuccessfulCall = true; + return r; } catch (InvocationTargetException e) { throw e.getCause(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index 131aa8feb80..c0a14b70512 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.io.retry; +import java.io.EOFException; import java.io.IOException; import java.net.ConnectException; import java.net.NoRouteToHostException; @@ -647,8 +648,9 @@ public class RetryPolicies { return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "retries (" + retries + ") exceeded maximum allowed (" + maxRetries + ")"); } - + if (e instanceof ConnectException || + e instanceof EOFException || e instanceof NoRouteToHostException || e instanceof UnknownHostException || e instanceof StandbyException || diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index d1d5b17c6fb..ed8d905a963 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -58,7 +58,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.AsyncGet; -import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.apache.htrace.core.Span; import org.apache.htrace.core.Tracer; @@ -94,8 +93,8 @@ public class Client implements AutoCloseable { private static final ThreadLocal callId = new ThreadLocal(); private static final ThreadLocal retryCount = new ThreadLocal(); - private static final ThreadLocal> ASYNC_RPC_RESPONSE - = new ThreadLocal<>(); + private static final ThreadLocal> + ASYNC_RPC_RESPONSE = new ThreadLocal<>(); private static final ThreadLocal asynchronousMode = new ThreadLocal() { @Override @@ -106,8 +105,9 @@ public class Client implements AutoCloseable { @SuppressWarnings("unchecked") @Unstable - public static Future getAsyncRpcResponse() { - return (Future) ASYNC_RPC_RESPONSE.get(); + public static AsyncGet + getAsyncRpcResponse() { + return (AsyncGet) ASYNC_RPC_RESPONSE.get(); } /** Set call id and retry count for the next call. */ @@ -1413,9 +1413,16 @@ public class Client implements AutoCloseable { } } } + + @Override + public boolean isDone() { + synchronized (call) { + return call.done; + } + } }; - ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet)); + ASYNC_RPC_RESPONSE.set(asyncGet); return null; } else { return getRpcResponse(call, connection, -1, null); @@ -1460,10 +1467,8 @@ public class Client implements AutoCloseable { synchronized (call) { while (!call.done) { try { - final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout( - timeout, unit); - call.wait(waitTimeout); // wait for the result - if (waitTimeout > 0 && !call.done) { + AsyncGet.Util.wait(call, timeout, unit); + if (timeout >= 0 && !call.done) { return null; } } catch (InterruptedException ie) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 0f43fc6d3d1..315ec67decb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -54,7 +54,6 @@ import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -256,14 +255,18 @@ public class ProtobufRpcEngine implements RpcEngine { } if (Client.isAsynchronousMode()) { - final Future frrw = Client.getAsyncRpcResponse(); + final AsyncGet arr + = Client.getAsyncRpcResponse(); final AsyncGet asyncGet = new AsyncGet() { @Override public Message get(long timeout, TimeUnit unit) throws Exception { - final RpcResponseWrapper rrw = timeout < 0? - frrw.get(): frrw.get(timeout, unit); - return getReturnMessage(method, rrw); + return getReturnMessage(method, arr.get(timeout, unit)); + } + + @Override + public boolean isDone() { + return arr.isDone(); } }; ASYNC_RETURN_MESSAGE.set(asyncGet); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java index 5eac869632e..f124890dd53 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java @@ -47,14 +47,19 @@ public interface AsyncGet { R get(long timeout, TimeUnit unit) throws E, TimeoutException, InterruptedException; + /** @return true if the underlying computation is done; false, otherwise. */ + boolean isDone(); + /** Utility */ class Util { - /** - * @return {@link Object#wait(long)} timeout converted - * from {@link #get(long, TimeUnit)} timeout. - */ - public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){ - return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout); + /** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */ + public static void wait(Object obj, long timeout, TimeUnit unit) + throws InterruptedException { + if (timeout < 0) { + obj.wait(); + } else if (timeout > 0) { + obj.wait(unit.toMillis(timeout)); + } } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java index 0ad191b4fee..4450c0ccbf1 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.TestIPC.TestServer; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -50,6 +51,11 @@ public class TestAsyncIPC { private static Configuration conf; private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class); + static AsyncGetFuture + getAsyncRpcResponseFuture() { + return new AsyncGetFuture<>(Client.getAsyncRpcResponse()); + } + @Before public void setupConf() { conf = new Configuration(); @@ -84,7 +90,7 @@ public class TestAsyncIPC { try { final long param = TestIPC.RANDOM.nextLong(); TestIPC.call(client, param, server, conf); - returnFutures.put(i, Client.getAsyncRpcResponse()); + returnFutures.put(i, getAsyncRpcResponseFuture()); expectedValues.put(i, param); } catch (Exception e) { failed = true; @@ -204,7 +210,7 @@ public class TestAsyncIPC { private void doCall(final int idx, final long param) throws IOException { TestIPC.call(client, param, server, conf); - returnFutures.put(idx, Client.getAsyncRpcResponse()); + returnFutures.put(idx, getAsyncRpcResponseFuture()); expectedValues.put(idx, param); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java index 29bac2ac4af..824336a2922 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; -import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; +import org.apache.hadoop.io.retry.AsyncCallHandler; import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.apache.hadoop.ipc.Client; @@ -51,9 +51,8 @@ public class AsyncDistributedFileSystem { this.dfs = dfs; } - static Future getReturnValue() { - return new AsyncGetFuture<>( - ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue()); + private static Future getReturnValue() { + return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 2373da786d9..bcf5269489b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; @@ -184,6 +183,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.AsyncCallHandler; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -212,8 +212,6 @@ import org.apache.hadoop.util.concurrent.AsyncGet; public class ClientNamenodeProtocolTranslatorPB implements ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator { final private ClientNamenodeProtocolPB rpcProxy; - private static final ThreadLocal> - ASYNC_RETURN_VALUE = new ThreadLocal<>(); static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST = GetServerDefaultsRequestProto.newBuilder().build(); @@ -247,12 +245,6 @@ public class ClientNamenodeProtocolTranslatorPB implements rpcProxy = proxy; } - @SuppressWarnings("unchecked") - @Unstable - public static AsyncGet getAsyncReturnValue() { - return (AsyncGet) ASYNC_RETURN_VALUE.get(); - } - @Override public void close() { RPC.stopProxy(rpcProxy); @@ -391,8 +383,13 @@ public class ClientNamenodeProtocolTranslatorPB implements asyncReturnMessage.get(timeout, unit); return null; } + + @Override + public boolean isDone() { + return asyncReturnMessage.isDone(); + } }; - ASYNC_RETURN_VALUE.set(asyncGet); + AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet); } @Override @@ -1367,17 +1364,20 @@ public class ClientNamenodeProtocolTranslatorPB implements rpcProxy.getAclStatus(null, req); final AsyncGet asyncReturnMessage = ProtobufRpcEngine.getAsyncReturnMessage(); - final AsyncGet asyncGet = - new AsyncGet() { - @Override - public AclStatus get(long timeout, TimeUnit unit) - throws Exception { - return PBHelperClient - .convert((GetAclStatusResponseProto) asyncReturnMessage - .get(timeout, unit)); - } - }; - ASYNC_RETURN_VALUE.set(asyncGet); + final AsyncGet asyncGet + = new AsyncGet() { + @Override + public AclStatus get(long timeout, TimeUnit unit) throws Exception { + return PBHelperClient.convert((GetAclStatusResponseProto) + asyncReturnMessage.get(timeout, unit)); + } + + @Override + public boolean isDone() { + return asyncReturnMessage.isDone(); + } + }; + AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet); return null; } else { return PBHelperClient.convert(rpcProxy.getAclStatus(null, req)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java index c7615a94559..6a602900b2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator; import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers; import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest; import org.apache.hadoop.ipc.AsyncCallLimitExceededException; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.junit.After; @@ -70,7 +71,7 @@ public class TestAsyncDFS { public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class); private final short replFactor = 1; private final long blockSize = 512; - private long fileLen = blockSize * 3; + private long fileLen = 0; private final long seed = Time.now(); private final Random r = new Random(seed); private final PermissionGenerator permGenerator = new PermissionGenerator(r); @@ -80,7 +81,7 @@ public class TestAsyncDFS { private Configuration conf; private MiniDFSCluster cluster; - private FileSystem fs; + private DistributedFileSystem fs; private AsyncDistributedFileSystem adfs; @Before @@ -95,10 +96,10 @@ public class TestAsyncDFS { ASYNC_CALL_LIMIT); // set server handlers conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); cluster.waitActive(); - fs = FileSystem.get(conf); - adfs = cluster.getFileSystem().getAsyncDistributedFileSystem(); + fs = cluster.getFileSystem(); + adfs = fs.getAsyncDistributedFileSystem(); } @After @@ -113,31 +114,6 @@ public class TestAsyncDFS { } } - static class AclQueueEntry { - private final Object future; - private final Path path; - private final Boolean isSetAcl; - - AclQueueEntry(final Object future, final Path path, - final Boolean isSetAcl) { - this.future = future; - this.path = path; - this.isSetAcl = isSetAcl; - } - - public final Object getFuture() { - return future; - } - - public final Path getPath() { - return path; - } - - public final Boolean isSetAcl() { - return this.isSetAcl; - } - } - @Test(timeout=60000) public void testBatchAsyncAcl() throws Exception { final String basePath = "testBatchAsyncAcl"; @@ -348,7 +324,7 @@ public class TestAsyncDFS { public static void checkPermissionDenied(final Exception e, final Path dir, final String user) { - assertTrue(e.getCause() instanceof ExecutionException); + assertTrue(e.getCause() instanceof RemoteException); assertTrue("Permission denied messages must carry AccessControlException", e.getMessage().contains("AccessControlException")); assertTrue("Permission denied messages must carry the username", e @@ -470,4 +446,9 @@ public class TestAsyncDFS { assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup())); } } + + @Test + public void testAsyncWithoutRetry() throws Exception { + TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java new file mode 100644 index 00000000000..9ade8ec31f6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.io.retry.AsyncCallHandler; +import org.apache.hadoop.io.retry.RetryInvocationHandler; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.concurrent.AsyncGetFuture; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** Test async methods with HA setup. */ +public class TestAsyncHDFSWithHA { + static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class); + static { + GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL); + } + + private static Future getReturnValue() { + return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn()); + } + + static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs, + Path[] dsts) throws IOException { + for (int i = 0; i < srcs.length; i++) { + srcs[i] = new Path(dir, "src" + i); + dsts[i] = new Path(dir, "dst" + i); + dfs.mkdirs(srcs[i]); + } + } + + static void runTestAsyncWithoutRetry(Configuration conf, + MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception { + final int num = 5; + + final String renameDir = "/testAsyncWithoutRetry/"; + final Path[] srcs = new Path[num + 1]; + final Path[] dsts = new Path[num + 1]; + mkdirs(dfs, renameDir, srcs, dsts); + + // create a proxy without retry. + final NameNodeProxiesClient.ProxyAndInfo proxyInfo + = NameNodeProxies.createNonHAProxy(conf, + cluster.getNameNode(0).getNameNodeAddress(), + ClientProtocol.class, UserGroupInformation.getCurrentUser(), + false); + final ClientProtocol cp = proxyInfo.getProxy(); + + // submit async calls + Client.setAsynchronousMode(true); + final List> results = new ArrayList<>(); + for (int i = 0; i < num; i++) { + final String src = srcs[i].toString(); + final String dst = dsts[i].toString(); + LOG.info(i + ") rename " + src + " -> " + dst); + cp.rename2(src, dst); + results.add(getReturnValue()); + } + Client.setAsynchronousMode(false); + + // wait for the async calls + for (Future f : results) { + f.get(); + } + + //check results + for (int i = 0; i < num; i++) { + Assert.assertEquals(false, dfs.exists(srcs[i])); + Assert.assertEquals(true, dfs.exists(dsts[i])); + } + } + + /** Testing HDFS async methods with HA setup. */ + @Test(timeout = 120000) + public void testAsyncWithHAFailover() throws Exception { + final int num = 10; + + final Configuration conf = new HdfsConfiguration(); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0).build(); + + try { + cluster.waitActive(); + cluster.transitionToActive(0); + + final DistributedFileSystem dfs = HATestUtil.configureFailoverFs( + cluster, conf); + runTestAsyncWithoutRetry(conf, cluster, dfs); + + final String renameDir = "/testAsyncWithHAFailover/"; + final Path[] srcs = new Path[num + 1]; + final Path[] dsts = new Path[num + 1]; + mkdirs(dfs, renameDir, srcs, dsts); + + // submit async calls and trigger failover in the middle. + final AsyncDistributedFileSystem adfs + = dfs.getAsyncDistributedFileSystem(); + final ExecutorService executor = Executors.newFixedThreadPool(num + 1); + + final List> results = new ArrayList<>(); + final List exceptions = new ArrayList<>(); + final List> futures = new ArrayList<>(); + final int half = num/2; + for(int i = 0; i <= num; i++) { + final int id = i; + futures.add(executor.submit(new Runnable() { + @Override + public void run() { + try { + if (id == half) { + // failover + cluster.shutdownNameNode(0); + cluster.transitionToActive(1); + } else { + // rename + results.add(adfs.rename(srcs[id], dsts[id])); + } + } catch (IOException e) { + exceptions.add(e); + } + } + })); + } + + // wait for the tasks + Assert.assertEquals(num + 1, futures.size()); + for(int i = 0; i <= num; i++) { + futures.get(i).get(); + } + // wait for the async calls + Assert.assertEquals(num, results.size()); + Assert.assertTrue(exceptions.isEmpty()); + for(Future r : results) { + r.get(); + } + + // check results + for(int i = 0; i <= num; i++) { + final boolean renamed = i != half; + Assert.assertEquals(!renamed, dfs.exists(srcs[i])); + Assert.assertEquals(renamed, dfs.exists(dsts[i])); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 42cf3d4aa85..169bbeef119 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -135,7 +136,8 @@ public abstract class HATestUtil { } /** Gets the filesystem instance by setting the failover configurations */ - public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf) + public static DistributedFileSystem configureFailoverFs( + MiniDFSCluster cluster, Configuration conf) throws IOException, URISyntaxException { return configureFailoverFs(cluster, conf, 0); } @@ -147,13 +149,14 @@ public abstract class HATestUtil { * @param nsIndex namespace index starting with zero * @throws IOException if an error occurs rolling the edit log */ - public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf, + public static DistributedFileSystem configureFailoverFs( + MiniDFSCluster cluster, Configuration conf, int nsIndex) throws IOException, URISyntaxException { conf = new Configuration(conf); String logicalName = getLogicalHostname(cluster); setFailoverConfigurations(cluster, conf, logicalName, nsIndex); FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); - return fs; + return (DistributedFileSystem)fs; } public static void setFailoverConfigurations(MiniDFSCluster cluster,