diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index 4a8cbafda45..4bf17629f9d 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -363,7 +363,13 @@
-
+
+
+
+
+
+
+
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
new file mode 100644
index 00000000000..5a03b034c3e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Handle async calls. */
+@InterfaceAudience.Private
+public class AsyncCallHandler {
+ static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
+
+ private static final ThreadLocal>
+ LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
+ private static final ThreadLocal>
+ ASYNC_RETURN = new ThreadLocal<>();
+
+ /** @return the async return value from {@link AsyncCallHandler}. */
+ @InterfaceStability.Unstable
+ @SuppressWarnings("unchecked")
+ public static AsyncGet getAsyncReturn() {
+ final AsyncGet asyncGet = (AsyncGet)ASYNC_RETURN.get();
+ if (asyncGet != null) {
+ ASYNC_RETURN.set(null);
+ return asyncGet;
+ } else {
+ return (AsyncGet) getLowerLayerAsyncReturn();
+ }
+ }
+
+ /** For the lower rpc layers to set the async return value. */
+ @InterfaceStability.Unstable
+ public static void setLowerLayerAsyncReturn(
+ AsyncGet, Exception> asyncReturn) {
+ LOWER_LAYER_ASYNC_RETURN.set(asyncReturn);
+ }
+
+ private static AsyncGet, Exception> getLowerLayerAsyncReturn() {
+ final AsyncGet, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
+ Preconditions.checkNotNull(asyncGet);
+ LOWER_LAYER_ASYNC_RETURN.set(null);
+ return asyncGet;
+ }
+
+ /** A simple concurrent queue which keeping track the empty start time. */
+ static class ConcurrentQueue {
+ private final Queue queue = new LinkedList<>();
+ private long emptyStartTime = Time.monotonicNow();
+
+ synchronized int size() {
+ return queue.size();
+ }
+
+ /** Is the queue empty for more than the given time in millisecond? */
+ synchronized boolean isEmpty(long time) {
+ return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
+ }
+
+ synchronized void offer(T c) {
+ final boolean added = queue.offer(c);
+ Preconditions.checkState(added);
+ }
+
+ synchronized T poll() {
+ Preconditions.checkState(!queue.isEmpty());
+ final T t = queue.poll();
+ if (queue.isEmpty()) {
+ emptyStartTime = Time.monotonicNow();
+ }
+ return t;
+ }
+ }
+
+ /** A queue for handling async calls. */
+ static class AsyncCallQueue {
+ private final ConcurrentQueue queue = new ConcurrentQueue<>();
+ private final Processor processor = new Processor();
+
+ void addCall(AsyncCall call) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add " + call);
+ }
+ queue.offer(call);
+ processor.tryStart();
+ }
+
+ void checkCalls() {
+ final int size = queue.size();
+ for (int i = 0; i < size; i++) {
+ final AsyncCall c = queue.poll();
+ if (!c.isDone()) {
+ queue.offer(c); // the call is not done yet, add it back.
+ }
+ }
+ }
+
+ /** Process the async calls in the queue. */
+ private class Processor {
+ static final long GRACE_PERIOD = 10*1000L;
+ static final long SLEEP_PERIOD = 100L;
+
+ private final AtomicReference running = new AtomicReference<>();
+
+ boolean isRunning(Daemon d) {
+ return d == running.get();
+ }
+
+ void tryStart() {
+ final Thread current = Thread.currentThread();
+ if (running.compareAndSet(null, current)) {
+ final Daemon daemon = new Daemon() {
+ @Override
+ public void run() {
+ for (; isRunning(this);) {
+ try {
+ Thread.sleep(SLEEP_PERIOD);
+ } catch (InterruptedException e) {
+ kill(this);
+ return;
+ }
+
+ checkCalls();
+ tryStop(this);
+ }
+ }
+ };
+
+ final boolean set = running.compareAndSet(current, daemon);
+ Preconditions.checkState(set);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
+ }
+ daemon.start();
+ }
+ }
+
+ void tryStop(Daemon d) {
+ if (queue.isEmpty(GRACE_PERIOD)) {
+ kill(d);
+ }
+ }
+
+ void kill(Daemon d) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Killing " + d);
+ }
+ final boolean set = running.compareAndSet(d, null);
+ Preconditions.checkState(set);
+ }
+ }
+ }
+
+ static class AsyncValue {
+ private V value;
+
+ synchronized V waitAsyncValue(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException {
+ if (value != null) {
+ return value;
+ }
+ AsyncGet.Util.wait(this, timeout, unit);
+ if (value != null) {
+ return value;
+ }
+
+ throw new TimeoutException("waitCallReturn timed out "
+ + timeout + " " + unit);
+ }
+
+ synchronized void set(V v) {
+ Preconditions.checkNotNull(v);
+ Preconditions.checkState(value == null);
+ value = v;
+ notify();
+ }
+
+ synchronized boolean isDone() {
+ return value != null;
+ }
+ }
+
+ static class AsyncCall extends RetryInvocationHandler.Call {
+ private final AsyncCallHandler asyncCallHandler;
+
+ private final AsyncValue asyncCallReturn = new AsyncValue<>();
+ private AsyncGet, Exception> lowerLayerAsyncGet;
+
+ AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
+ RetryInvocationHandler.Counters counters,
+ RetryInvocationHandler> retryInvocationHandler,
+ AsyncCallHandler asyncCallHandler) {
+ super(method, args, isRpc, callId, counters, retryInvocationHandler);
+
+ this.asyncCallHandler = asyncCallHandler;
+ }
+
+ /** @return true if the call is done; otherwise, return false. */
+ boolean isDone() {
+ final CallReturn r = invokeOnce();
+ switch (r.getState()) {
+ case RETURNED:
+ case EXCEPTION:
+ asyncCallReturn.set(r); // the async call is done
+ return true;
+ case RETRY:
+ invokeOnce();
+ break;
+ case ASYNC_CALL_IN_PROGRESS:
+ case ASYNC_INVOKED:
+ // nothing to do
+ break;
+ default:
+ Preconditions.checkState(false);
+ }
+ return false;
+ }
+
+ @Override
+ CallReturn invoke() throws Throwable {
+ LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
+ if (lowerLayerAsyncGet != null) {
+ // async call was submitted early, check the lower level async call
+ final boolean isDone = lowerLayerAsyncGet.isDone();
+ LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
+ if (!isDone) {
+ return CallReturn.ASYNC_CALL_IN_PROGRESS;
+ }
+ try {
+ return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
+ } finally {
+ lowerLayerAsyncGet = null;
+ }
+ }
+
+ // submit a new async call
+ LOG.trace("invoke: ASYNC_INVOKED");
+ final boolean mode = Client.isAsynchronousMode();
+ try {
+ Client.setAsynchronousMode(true);
+ final Object r = invokeMethod();
+ // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
+ Preconditions.checkState(r == null);
+ lowerLayerAsyncGet = getLowerLayerAsyncReturn();
+
+ if (counters.isZeros()) {
+ // first async attempt, initialize
+ LOG.trace("invoke: initAsyncCall");
+ asyncCallHandler.initAsyncCall(this, asyncCallReturn);
+ }
+ return CallReturn.ASYNC_INVOKED;
+ } finally {
+ Client.setAsynchronousMode(mode);
+ }
+ }
+ }
+
+ private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
+ private volatile boolean hasSuccessfulCall = false;
+
+ AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
+ int callId, RetryInvocationHandler.Counters counters,
+ RetryInvocationHandler> retryInvocationHandler) {
+ return new AsyncCall(method, args, isRpc, callId, counters,
+ retryInvocationHandler, this);
+ }
+
+ boolean hasSuccessfulCall() {
+ return hasSuccessfulCall;
+ }
+
+ private void initAsyncCall(final AsyncCall asyncCall,
+ final AsyncValue asyncCallReturn) {
+ asyncCalls.addCall(asyncCall);
+
+ final AsyncGet