diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 892df897c67..afad06683ac 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -18,10 +18,46 @@ package org.apache.hadoop.ipc; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.CodedOutputStream; +import static org.apache.hadoop.ipc.RpcConstants.*; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.net.SocketFactory; +import javax.security.sasl.Sasl; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -57,25 +93,14 @@ import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.AsyncGet; -import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.apache.htrace.core.Span; import org.apache.htrace.core.Tracer; -import javax.net.SocketFactory; -import javax.security.sasl.Sasl; -import java.io.*; -import java.net.*; -import java.security.PrivilegedExceptionAction; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID; -import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.CodedOutputStream; /** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -94,8 +119,8 @@ public class Client implements AutoCloseable { private static final ThreadLocal callId = new ThreadLocal(); private static final ThreadLocal retryCount = new ThreadLocal(); - private static final ThreadLocal> ASYNC_RPC_RESPONSE - = new ThreadLocal<>(); + private static final ThreadLocal> + RETURN_RPC_RESPONSE = new ThreadLocal<>(); private static final ThreadLocal asynchronousMode = new ThreadLocal() { @Override @@ -106,8 +131,8 @@ public class Client implements AutoCloseable { @SuppressWarnings("unchecked") @Unstable - public static Future getAsyncRpcResponse() { - return (Future) ASYNC_RPC_RESPONSE.get(); + public static Future getReturnRpcResponse() { + return (Future) RETURN_RPC_RESPONSE.get(); } /** Set call id and retry count for the next call. */ @@ -356,11 +381,6 @@ public class Client implements AutoCloseable { } } - @Override - public String toString() { - return getClass().getSimpleName() + id; - } - /** Indicate when the call is complete and the * value or error are available. Notifies by default. */ protected synchronized void callComplete() { @@ -1395,32 +1415,27 @@ public class Client implements AutoCloseable { } if (isAsynchronousMode()) { - final AsyncGet asyncGet - = new AsyncGet() { + Future returnFuture = new AbstractFuture() { + private final AtomicBoolean callled = new AtomicBoolean(false); @Override - public Writable get(long timeout, TimeUnit unit) - throws IOException, TimeoutException{ - boolean done = true; - try { - final Writable w = getRpcResponse(call, connection, timeout, unit); - if (w == null) { - done = false; - throw new TimeoutException(call + " timed out " - + timeout + " " + unit); - } - return w; - } finally { - if (done) { + public Writable get() throws InterruptedException, ExecutionException { + if (callled.compareAndSet(false, true)) { + try { + set(getRpcResponse(call, connection)); + } catch (IOException ie) { + setException(ie); + } finally { releaseAsyncCall(); } } + return super.get(); } }; - ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet)); + RETURN_RPC_RESPONSE.set(returnFuture); return null; } else { - return getRpcResponse(call, connection, -1, null); + return getRpcResponse(call, connection); } } @@ -1456,18 +1471,12 @@ public class Client implements AutoCloseable { return asyncCallCounter.get(); } - /** @return the rpc response or, in case of timeout, null. */ - private Writable getRpcResponse(final Call call, final Connection connection, - final long timeout, final TimeUnit unit) throws IOException { + private Writable getRpcResponse(final Call call, final Connection connection) + throws IOException { synchronized (call) { while (!call.done) { try { - final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout( - timeout, unit); - call.wait(waitTimeout); // wait for the result - if (waitTimeout > 0 && !call.done) { - return null; - } + call.wait(); // wait for the result } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new InterruptedIOException("Call interrupted"); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 4641a6778a9..350e041aca9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -18,9 +18,21 @@ package org.apache.hadoop.ipc; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.*; -import com.google.protobuf.Descriptors.MethodDescriptor; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.SocketFactory; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -40,23 +52,17 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.AsyncGet; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; -import javax.net.SocketFactory; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingService; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.GeneratedMessage; +import com.google.protobuf.Message; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; /** * RPC Engine for for protobuf based RPCs. @@ -64,8 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean; @InterfaceStability.Evolving public class ProtobufRpcEngine implements RpcEngine { public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class); - private static final ThreadLocal> - ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); + private static final ThreadLocal> + RETURN_MESSAGE_CALLBACK = new ThreadLocal<>(); static { // Register the rpcRequest deserializer for WritableRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( @@ -75,9 +81,10 @@ public class ProtobufRpcEngine implements RpcEngine { private static final ClientCache CLIENTS = new ClientCache(); + @SuppressWarnings("unchecked") @Unstable - public static AsyncGet getAsyncReturnMessage() { - return ASYNC_RETURN_MESSAGE.get(); + public static Callable getReturnMessageCallback() { + return (Callable) RETURN_MESSAGE_CALLBACK.get(); } public ProtocolProxy getProxy(Class protocol, long clientVersion, @@ -256,17 +263,14 @@ public class ProtobufRpcEngine implements RpcEngine { } if (Client.isAsynchronousMode()) { - final Future frrw = Client.getAsyncRpcResponse(); - final AsyncGet asyncGet - = new AsyncGet() { + final Future frrw = Client.getReturnRpcResponse(); + Callable callback = new Callable() { @Override - public Message get(long timeout, TimeUnit unit) throws Exception { - final RpcResponseWrapper rrw = timeout < 0? - frrw.get(): frrw.get(timeout, unit); - return getReturnMessage(method, rrw); + public Message call() throws Exception { + return getReturnMessage(method, frrw.get()); } }; - ASYNC_RETURN_MESSAGE.set(asyncGet); + RETURN_MESSAGE_CALLBACK.set(callback); return null; } else { return getReturnMessage(method, val); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java deleted file mode 100644 index 5eac869632e..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.util.concurrent; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * This interface defines an asynchronous {@link #get(long, TimeUnit)} method. - * - * When the return value is still being computed, invoking - * {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}. - * The method should be invoked again and again - * until the underlying computation is completed. - * - * @param The type of the return value. - * @param The exception type that the underlying implementation may throw. - */ -public interface AsyncGet { - /** - * Get the result. - * - * @param timeout The maximum time period to wait. - * When timeout == 0, it does not wait at all. - * When timeout < 0, it waits indefinitely. - * @param unit The unit of the timeout value - * @return the result, which is possibly null. - * @throws E an exception thrown by the underlying implementation. - * @throws TimeoutException if it cannot return after the given time period. - * @throws InterruptedException if the thread is interrupted. - */ - R get(long timeout, TimeUnit unit) - throws E, TimeoutException, InterruptedException; - - /** Utility */ - class Util { - /** - * @return {@link Object#wait(long)} timeout converted - * from {@link #get(long, TimeUnit)} timeout. - */ - public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){ - return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout); - } - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java deleted file mode 100644 index d6878670ffe..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.util.concurrent; - -import com.google.common.util.concurrent.AbstractFuture; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** A {@link Future} implemented using an {@link AsyncGet} object. */ -public class AsyncGetFuture extends AbstractFuture { - public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class); - - private final AtomicBoolean called = new AtomicBoolean(false); - private final AsyncGet asyncGet; - - public AsyncGetFuture(AsyncGet asyncGet) { - this.asyncGet = asyncGet; - } - - private void callAsyncGet(long timeout, TimeUnit unit) { - if (!isCancelled() && called.compareAndSet(false, true)) { - try { - set(asyncGet.get(timeout, unit)); - } catch (TimeoutException te) { - LOG.trace("TRACE", te); - called.compareAndSet(true, false); - } catch (Throwable e) { - LOG.trace("TRACE", e); - setException(e); - } - } - } - - @Override - public T get() throws InterruptedException, ExecutionException { - callAsyncGet(-1, TimeUnit.MILLISECONDS); - return super.get(); - } - - @Override - public T get(long timeout, TimeUnit unit) - throws InterruptedException, TimeoutException, ExecutionException { - callAsyncGet(timeout, unit); - return super.get(0, TimeUnit.MILLISECONDS); - } - - @Override - public boolean isDone() { - callAsyncGet(0, TimeUnit.MILLISECONDS); - return super.isDone(); - } -} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java index 7623975bf17..8ee3a2c64d3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java @@ -18,6 +18,20 @@ package org.apache.hadoop.ipc; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,17 +48,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - public class TestAsyncIPC { private static Configuration conf; @@ -84,51 +87,26 @@ public class TestAsyncIPC { try { final long param = TestIPC.RANDOM.nextLong(); TestIPC.call(client, param, server, conf); - Future returnFuture = Client.getAsyncRpcResponse(); + Future returnFuture = Client.getReturnRpcResponse(); returnFutures.put(i, returnFuture); expectedValues.put(i, param); } catch (Exception e) { + LOG.fatal("Caught: " + StringUtils.stringifyException(e)); failed = true; - throw new RuntimeException(e); } } } - void assertReturnValues() throws InterruptedException, ExecutionException { + public void waitForReturnValues() throws InterruptedException, + ExecutionException { for (int i = 0; i < count; i++) { LongWritable value = returnFutures.get(i).get(); - Assert.assertEquals("call" + i + " failed.", - expectedValues.get(i).longValue(), value.get()); - } - Assert.assertFalse(failed); - } - - void assertReturnValues(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException { - final boolean[] checked = new boolean[count]; - for(boolean done = false; !done;) { - done = true; - for (int i = 0; i < count; i++) { - if (checked[i]) { - continue; - } else { - done = false; - } - - final LongWritable value; - try { - value = returnFutures.get(i).get(timeout, unit); - } catch (TimeoutException e) { - LOG.info("call" + i + " caught ", e); - continue; - } - - Assert.assertEquals("call" + i + " failed.", - expectedValues.get(i).longValue(), value.get()); - checked[i] = true; + if (expectedValues.get(i) != value.get()) { + LOG.fatal(String.format("Call-%d failed!", i)); + failed = true; + break; } } - Assert.assertFalse(failed); } } @@ -205,7 +183,7 @@ public class TestAsyncIPC { private void doCall(final int idx, final long param) throws IOException { TestIPC.call(client, param, server, conf); - Future returnFuture = Client.getAsyncRpcResponse(); + Future returnFuture = Client.getReturnRpcResponse(); returnFutures.put(idx, returnFuture); expectedValues.put(idx, param); } @@ -255,7 +233,10 @@ public class TestAsyncIPC { } for (int i = 0; i < callerCount; i++) { callers[i].join(); - callers[i].assertReturnValues(); + callers[i].waitForReturnValues(); + String msg = String.format("Expected not failed for caller-%d: %s.", i, + callers[i]); + assertFalse(msg, callers[i].failed); } for (int i = 0; i < clientCount; i++) { clients[i].stop(); @@ -277,37 +258,25 @@ public class TestAsyncIPC { try { AsyncCaller caller = new AsyncCaller(client, addr, callCount); caller.run(); - caller.assertReturnValues(); - caller.assertReturnValues(); - caller.assertReturnValues(); - Assert.assertEquals(asyncCallCount, client.getAsyncCallCount()); + + caller.waitForReturnValues(); + String msg = String.format( + "First time, expected not failed for caller: %s.", caller); + assertFalse(msg, caller.failed); + + caller.waitForReturnValues(); + assertTrue(asyncCallCount == client.getAsyncCallCount()); + msg = String.format("Second time, expected not failed for caller: %s.", + caller); + assertFalse(msg, caller.failed); + + assertTrue(asyncCallCount == client.getAsyncCallCount()); } finally { client.stop(); server.stop(); } } - @Test(timeout = 60000) - public void testFutureGetWithTimeout() throws IOException, - InterruptedException, ExecutionException { -// GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL); - final Server server = new TestIPC.TestServer(10, true, conf); - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - server.start(); - - final Client client = new Client(LongWritable.class, conf); - - try { - final AsyncCaller caller = new AsyncCaller(client, addr, 10); - caller.run(); - caller.assertReturnValues(10, TimeUnit.MILLISECONDS); - } finally { - client.stop(); - server.stop(); - } - } - - public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep, int clientCount, int callerCount, int callCount) throws IOException, InterruptedException, ExecutionException { @@ -398,7 +367,9 @@ public class TestAsyncIPC { server.start(); final AsyncCaller caller = new AsyncCaller(client, addr, 4); caller.run(); - caller.assertReturnValues(); + caller.waitForReturnValues(); + String msg = String.format("Expected not failed for caller: %s.", caller); + assertFalse(msg, caller.failed); } finally { client.stop(); server.stop(); @@ -435,7 +406,9 @@ public class TestAsyncIPC { server.start(); final AsyncCaller caller = new AsyncCaller(client, addr, 10); caller.run(); - caller.assertReturnValues(); + caller.waitForReturnValues(); + String msg = String.format("Expected not failed for caller: %s.", caller); + assertFalse(msg, caller.failed); } finally { client.stop(); server.stop(); @@ -470,7 +443,9 @@ public class TestAsyncIPC { server.start(); final AsyncCaller caller = new AsyncCaller(client, addr, 10); caller.run(); - caller.assertReturnValues(); + caller.waitForReturnValues(); + String msg = String.format("Expected not failed for caller: %s.", caller); + assertFalse(msg, caller.failed); } finally { client.stop(); server.stop(); @@ -514,7 +489,10 @@ public class TestAsyncIPC { } for (int i = 0; i < callerCount; ++i) { callers[i].join(); - callers[i].assertReturnValues(); + callers[i].waitForReturnValues(); + String msg = String.format("Expected not failed for caller-%d: %s.", i, + callers[i]); + assertFalse(msg, callers[i].failed); } } finally { client.stop(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java index 1f60df2e386..4fe0861921c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -19,17 +19,20 @@ package org.apache.hadoop.hdfs; import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; -import org.apache.hadoop.util.concurrent.AsyncGet; -import org.apache.hadoop.util.concurrent.AsyncGetFuture; import org.apache.hadoop.ipc.Client; +import com.google.common.util.concurrent.AbstractFuture; + /**************************************************************** * Implementation of the asynchronous distributed file system. * This instance of this class is the way end-user code interacts @@ -49,9 +52,22 @@ public class AsyncDistributedFileSystem { } static Future getReturnValue() { - final AsyncGet asyncGet - = ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue(); - return new AsyncGetFuture<>(asyncGet); + final Callable returnValueCallback = ClientNamenodeProtocolTranslatorPB + .getReturnValueCallback(); + Future returnFuture = new AbstractFuture() { + private final AtomicBoolean called = new AtomicBoolean(false); + public T get() throws InterruptedException, ExecutionException { + if (called.compareAndSet(false, true)) { + try { + set(returnValueCallback.call()); + } catch (Exception e) { + setException(e); + } + } + return super.get(); + } + }; + return returnFuture; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 796aa29d7f7..28ac78d70fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -24,8 +24,7 @@ import java.util.EnumSet; import java.util.List; import com.google.common.collect.Lists; - -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Callable; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -192,7 +191,6 @@ import org.apache.hadoop.security.token.Token; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; -import org.apache.hadoop.util.concurrent.AsyncGet; /** * This class forwards NN's ClientProtocol calls as RPC calls to the NN server @@ -204,8 +202,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet; public class ClientNamenodeProtocolTranslatorPB implements ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator { final private ClientNamenodeProtocolPB rpcProxy; - private static final ThreadLocal> - ASYNC_RETURN_VALUE = new ThreadLocal<>(); + private static final ThreadLocal> + RETURN_VALUE_CALLBACK = new ThreadLocal<>(); static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST = GetServerDefaultsRequestProto.newBuilder().build(); @@ -240,8 +238,8 @@ public class ClientNamenodeProtocolTranslatorPB implements @SuppressWarnings("unchecked") @Unstable - public static AsyncGet getAsyncReturnValue() { - return (AsyncGet) ASYNC_RETURN_VALUE.get(); + public static Callable getReturnValueCallback() { + return (Callable) RETURN_VALUE_CALLBACK.get(); } @Override @@ -363,7 +361,7 @@ public class ClientNamenodeProtocolTranslatorPB implements try { if (Client.isAsynchronousMode()) { rpcProxy.setPermission(null, req); - setAsyncReturnValue(); + setReturnValueCallback(); } else { rpcProxy.setPermission(null, req); } @@ -372,18 +370,17 @@ public class ClientNamenodeProtocolTranslatorPB implements } } - private void setAsyncReturnValue() { - final AsyncGet asyncReturnMessage - = ProtobufRpcEngine.getAsyncReturnMessage(); - final AsyncGet asyncGet - = new AsyncGet() { + private void setReturnValueCallback() { + final Callable returnMessageCallback = ProtobufRpcEngine + .getReturnMessageCallback(); + Callable callBack = new Callable() { @Override - public Void get(long timeout, TimeUnit unit) throws Exception { - asyncReturnMessage.get(timeout, unit); + public Void call() throws Exception { + returnMessageCallback.call(); return null; } }; - ASYNC_RETURN_VALUE.set(asyncGet); + RETURN_VALUE_CALLBACK.set(callBack); } @Override @@ -398,7 +395,7 @@ public class ClientNamenodeProtocolTranslatorPB implements try { if (Client.isAsynchronousMode()) { rpcProxy.setOwner(null, req.build()); - setAsyncReturnValue(); + setReturnValueCallback(); } else { rpcProxy.setOwner(null, req.build()); } @@ -530,7 +527,7 @@ public class ClientNamenodeProtocolTranslatorPB implements try { if (Client.isAsynchronousMode()) { rpcProxy.rename2(null, req); - setAsyncReturnValue(); + setReturnValueCallback(); } else { rpcProxy.rename2(null, req); }