Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls."

This reverts commit 658072d62e.
This commit is contained in:
Andrew Wang 2016-06-03 18:10:49 -07:00
parent 87ea0784eb
commit 071aeab585
7 changed files with 186 additions and 315 deletions

View File

@ -18,10 +18,46 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import com.google.common.annotations.VisibleForTesting; import static org.apache.hadoop.ipc.RpcConstants.*;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.BufferedInputStream;
import com.google.protobuf.CodedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
import javax.security.sasl.Sasl;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -57,25 +93,14 @@ import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.htrace.core.Span; import org.apache.htrace.core.Span;
import org.apache.htrace.core.Tracer; import org.apache.htrace.core.Tracer;
import javax.net.SocketFactory; import com.google.common.annotations.VisibleForTesting;
import javax.security.sasl.Sasl; import com.google.common.base.Preconditions;
import java.io.*; import com.google.common.util.concurrent.AbstractFuture;
import java.net.*; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.security.PrivilegedExceptionAction; import com.google.protobuf.CodedOutputStream;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a /** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on * parameter, and return a {@link Writable} as their value. A service runs on
@ -94,8 +119,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>(); private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>(); private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE private static final ThreadLocal<Future<?>>
= new ThreadLocal<>(); RETURN_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode = private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() { new ThreadLocal<Boolean>() {
@Override @Override
@ -106,8 +131,8 @@ public class Client implements AutoCloseable {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Unstable @Unstable
public static <T> Future<T> getAsyncRpcResponse() { public static <T> Future<T> getReturnRpcResponse() {
return (Future<T>) ASYNC_RPC_RESPONSE.get(); return (Future<T>) RETURN_RPC_RESPONSE.get();
} }
/** Set call id and retry count for the next call. */ /** Set call id and retry count for the next call. */
@ -356,11 +381,6 @@ public class Client implements AutoCloseable {
} }
} }
@Override
public String toString() {
return getClass().getSimpleName() + id;
}
/** Indicate when the call is complete and the /** Indicate when the call is complete and the
* value or error are available. Notifies by default. */ * value or error are available. Notifies by default. */
protected synchronized void callComplete() { protected synchronized void callComplete() {
@ -1395,32 +1415,27 @@ public class Client implements AutoCloseable {
} }
if (isAsynchronousMode()) { if (isAsynchronousMode()) {
final AsyncGet<Writable, IOException> asyncGet Future<Writable> returnFuture = new AbstractFuture<Writable>() {
= new AsyncGet<Writable, IOException>() { private final AtomicBoolean callled = new AtomicBoolean(false);
@Override @Override
public Writable get(long timeout, TimeUnit unit) public Writable get() throws InterruptedException, ExecutionException {
throws IOException, TimeoutException{ if (callled.compareAndSet(false, true)) {
boolean done = true; try {
try { set(getRpcResponse(call, connection));
final Writable w = getRpcResponse(call, connection, timeout, unit); } catch (IOException ie) {
if (w == null) { setException(ie);
done = false; } finally {
throw new TimeoutException(call + " timed out "
+ timeout + " " + unit);
}
return w;
} finally {
if (done) {
releaseAsyncCall(); releaseAsyncCall();
} }
} }
return super.get();
} }
}; };
ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet)); RETURN_RPC_RESPONSE.set(returnFuture);
return null; return null;
} else { } else {
return getRpcResponse(call, connection, -1, null); return getRpcResponse(call, connection);
} }
} }
@ -1456,18 +1471,12 @@ public class Client implements AutoCloseable {
return asyncCallCounter.get(); return asyncCallCounter.get();
} }
/** @return the rpc response or, in case of timeout, null. */ private Writable getRpcResponse(final Call call, final Connection connection)
private Writable getRpcResponse(final Call call, final Connection connection, throws IOException {
final long timeout, final TimeUnit unit) throws IOException {
synchronized (call) { synchronized (call) {
while (!call.done) { while (!call.done) {
try { try {
final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout( call.wait(); // wait for the result
timeout, unit);
call.wait(waitTimeout); // wait for the result
if (waitTimeout > 0 && !call.done) {
return null;
}
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new InterruptedIOException("Call interrupted"); throw new InterruptedIOException("Call interrupted");

View File

@ -18,9 +18,21 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import com.google.common.annotations.VisibleForTesting; import java.io.DataInput;
import com.google.protobuf.*; import java.io.DataOutput;
import com.google.protobuf.Descriptors.MethodDescriptor; import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -40,23 +52,17 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer; import org.apache.htrace.core.Tracer;
import javax.net.SocketFactory; import com.google.common.annotations.VisibleForTesting;
import java.io.DataInput; import com.google.protobuf.BlockingService;
import java.io.DataOutput; import com.google.protobuf.CodedOutputStream;
import java.io.IOException; import com.google.protobuf.Descriptors.MethodDescriptor;
import java.io.OutputStream; import com.google.protobuf.GeneratedMessage;
import java.lang.reflect.Method; import com.google.protobuf.Message;
import java.lang.reflect.Proxy; import com.google.protobuf.ServiceException;
import java.net.InetSocketAddress; import com.google.protobuf.TextFormat;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* RPC Engine for for protobuf based RPCs. * RPC Engine for for protobuf based RPCs.
@ -64,8 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine { public class ProtobufRpcEngine implements RpcEngine {
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class); public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
private static final ThreadLocal<AsyncGet<Message, Exception>> private static final ThreadLocal<Callable<?>>
ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for WritableRpcEngine static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine( org.apache.hadoop.ipc.Server.registerProtocolEngine(
@ -75,9 +81,10 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ClientCache CLIENTS = new ClientCache(); private static final ClientCache CLIENTS = new ClientCache();
@SuppressWarnings("unchecked")
@Unstable @Unstable
public static AsyncGet<Message, Exception> getAsyncReturnMessage() { public static <T> Callable<T> getReturnMessageCallback() {
return ASYNC_RETURN_MESSAGE.get(); return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
} }
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@ -256,17 +263,14 @@ public class ProtobufRpcEngine implements RpcEngine {
} }
if (Client.isAsynchronousMode()) { if (Client.isAsynchronousMode()) {
final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse(); final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
final AsyncGet<Message, Exception> asyncGet Callable<Message> callback = new Callable<Message>() {
= new AsyncGet<Message, Exception>() {
@Override @Override
public Message get(long timeout, TimeUnit unit) throws Exception { public Message call() throws Exception {
final RpcResponseWrapper rrw = timeout < 0? return getReturnMessage(method, frrw.get());
frrw.get(): frrw.get(timeout, unit);
return getReturnMessage(method, rrw);
} }
}; };
ASYNC_RETURN_MESSAGE.set(asyncGet); RETURN_MESSAGE_CALLBACK.set(callback);
return null; return null;
} else { } else {
return getReturnMessage(method, val); return getReturnMessage(method, val);

View File

@ -1,60 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* This interface defines an asynchronous {@link #get(long, TimeUnit)} method.
*
* When the return value is still being computed, invoking
* {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}.
* The method should be invoked again and again
* until the underlying computation is completed.
*
* @param <R> The type of the return value.
* @param <E> The exception type that the underlying implementation may throw.
*/
public interface AsyncGet<R, E extends Throwable> {
/**
* Get the result.
*
* @param timeout The maximum time period to wait.
* When timeout == 0, it does not wait at all.
* When timeout < 0, it waits indefinitely.
* @param unit The unit of the timeout value
* @return the result, which is possibly null.
* @throws E an exception thrown by the underlying implementation.
* @throws TimeoutException if it cannot return after the given time period.
* @throws InterruptedException if the thread is interrupted.
*/
R get(long timeout, TimeUnit unit)
throws E, TimeoutException, InterruptedException;
/** Utility */
class Util {
/**
* @return {@link Object#wait(long)} timeout converted
* from {@link #get(long, TimeUnit)} timeout.
*/
public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
}
}
}

View File

@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.concurrent;
import com.google.common.util.concurrent.AbstractFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
/** A {@link Future} implemented using an {@link AsyncGet} object. */
public class AsyncGetFuture<T, E extends Throwable> extends AbstractFuture<T> {
public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class);
private final AtomicBoolean called = new AtomicBoolean(false);
private final AsyncGet<T, E> asyncGet;
public AsyncGetFuture(AsyncGet<T, E> asyncGet) {
this.asyncGet = asyncGet;
}
private void callAsyncGet(long timeout, TimeUnit unit) {
if (!isCancelled() && called.compareAndSet(false, true)) {
try {
set(asyncGet.get(timeout, unit));
} catch (TimeoutException te) {
LOG.trace("TRACE", te);
called.compareAndSet(true, false);
} catch (Throwable e) {
LOG.trace("TRACE", e);
setException(e);
}
}
}
@Override
public T get() throws InterruptedException, ExecutionException {
callAsyncGet(-1, TimeUnit.MILLISECONDS);
return super.get();
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException {
callAsyncGet(timeout, unit);
return super.get(0, TimeUnit.MILLISECONDS);
}
@Override
public boolean isDone() {
callAsyncGet(0, TimeUnit.MILLISECONDS);
return super.isDone();
}
}

View File

@ -18,6 +18,20 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -34,17 +48,6 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class TestAsyncIPC { public class TestAsyncIPC {
private static Configuration conf; private static Configuration conf;
@ -84,51 +87,26 @@ public class TestAsyncIPC {
try { try {
final long param = TestIPC.RANDOM.nextLong(); final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf); TestIPC.call(client, param, server, conf);
Future<LongWritable> returnFuture = Client.getAsyncRpcResponse(); Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
returnFutures.put(i, returnFuture); returnFutures.put(i, returnFuture);
expectedValues.put(i, param); expectedValues.put(i, param);
} catch (Exception e) { } catch (Exception e) {
LOG.fatal("Caught: " + StringUtils.stringifyException(e));
failed = true; failed = true;
throw new RuntimeException(e);
} }
} }
} }
void assertReturnValues() throws InterruptedException, ExecutionException { public void waitForReturnValues() throws InterruptedException,
ExecutionException {
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
LongWritable value = returnFutures.get(i).get(); LongWritable value = returnFutures.get(i).get();
Assert.assertEquals("call" + i + " failed.", if (expectedValues.get(i) != value.get()) {
expectedValues.get(i).longValue(), value.get()); LOG.fatal(String.format("Call-%d failed!", i));
} failed = true;
Assert.assertFalse(failed); break;
}
void assertReturnValues(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException {
final boolean[] checked = new boolean[count];
for(boolean done = false; !done;) {
done = true;
for (int i = 0; i < count; i++) {
if (checked[i]) {
continue;
} else {
done = false;
}
final LongWritable value;
try {
value = returnFutures.get(i).get(timeout, unit);
} catch (TimeoutException e) {
LOG.info("call" + i + " caught ", e);
continue;
}
Assert.assertEquals("call" + i + " failed.",
expectedValues.get(i).longValue(), value.get());
checked[i] = true;
} }
} }
Assert.assertFalse(failed);
} }
} }
@ -205,7 +183,7 @@ public class TestAsyncIPC {
private void doCall(final int idx, final long param) throws IOException { private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf); TestIPC.call(client, param, server, conf);
Future<LongWritable> returnFuture = Client.getAsyncRpcResponse(); Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
returnFutures.put(idx, returnFuture); returnFutures.put(idx, returnFuture);
expectedValues.put(idx, param); expectedValues.put(idx, param);
} }
@ -255,7 +233,10 @@ public class TestAsyncIPC {
} }
for (int i = 0; i < callerCount; i++) { for (int i = 0; i < callerCount; i++) {
callers[i].join(); callers[i].join();
callers[i].assertReturnValues(); callers[i].waitForReturnValues();
String msg = String.format("Expected not failed for caller-%d: %s.", i,
callers[i]);
assertFalse(msg, callers[i].failed);
} }
for (int i = 0; i < clientCount; i++) { for (int i = 0; i < clientCount; i++) {
clients[i].stop(); clients[i].stop();
@ -277,37 +258,25 @@ public class TestAsyncIPC {
try { try {
AsyncCaller caller = new AsyncCaller(client, addr, callCount); AsyncCaller caller = new AsyncCaller(client, addr, callCount);
caller.run(); caller.run();
caller.assertReturnValues();
caller.assertReturnValues(); caller.waitForReturnValues();
caller.assertReturnValues(); String msg = String.format(
Assert.assertEquals(asyncCallCount, client.getAsyncCallCount()); "First time, expected not failed for caller: %s.", caller);
assertFalse(msg, caller.failed);
caller.waitForReturnValues();
assertTrue(asyncCallCount == client.getAsyncCallCount());
msg = String.format("Second time, expected not failed for caller: %s.",
caller);
assertFalse(msg, caller.failed);
assertTrue(asyncCallCount == client.getAsyncCallCount());
} finally { } finally {
client.stop(); client.stop();
server.stop(); server.stop();
} }
} }
@Test(timeout = 60000)
public void testFutureGetWithTimeout() throws IOException,
InterruptedException, ExecutionException {
// GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
final Server server = new TestIPC.TestServer(10, true, conf);
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final Client client = new Client(LongWritable.class, conf);
try {
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
} finally {
client.stop();
server.stop();
}
}
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep, public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException, int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException { InterruptedException, ExecutionException {
@ -398,7 +367,9 @@ public class TestAsyncIPC {
server.start(); server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 4); final AsyncCaller caller = new AsyncCaller(client, addr, 4);
caller.run(); caller.run();
caller.assertReturnValues(); caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
assertFalse(msg, caller.failed);
} finally { } finally {
client.stop(); client.stop();
server.stop(); server.stop();
@ -435,7 +406,9 @@ public class TestAsyncIPC {
server.start(); server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10); final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run(); caller.run();
caller.assertReturnValues(); caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
assertFalse(msg, caller.failed);
} finally { } finally {
client.stop(); client.stop();
server.stop(); server.stop();
@ -470,7 +443,9 @@ public class TestAsyncIPC {
server.start(); server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10); final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run(); caller.run();
caller.assertReturnValues(); caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
assertFalse(msg, caller.failed);
} finally { } finally {
client.stop(); client.stop();
server.stop(); server.stop();
@ -514,7 +489,10 @@ public class TestAsyncIPC {
} }
for (int i = 0; i < callerCount; ++i) { for (int i = 0; i < callerCount; ++i) {
callers[i].join(); callers[i].join();
callers[i].assertReturnValues(); callers[i].waitForReturnValues();
String msg = String.format("Expected not failed for caller-%d: %s.", i,
callers[i]);
assertFalse(msg, callers[i].failed);
} }
} finally { } finally {
client.stop(); client.stop();

View File

@ -19,17 +19,20 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import com.google.common.util.concurrent.AbstractFuture;
/**************************************************************** /****************************************************************
* Implementation of the asynchronous distributed file system. * Implementation of the asynchronous distributed file system.
* This instance of this class is the way end-user code interacts * This instance of this class is the way end-user code interacts
@ -49,9 +52,22 @@ public class AsyncDistributedFileSystem {
} }
static <T> Future<T> getReturnValue() { static <T> Future<T> getReturnValue() {
final AsyncGet<T, Exception> asyncGet final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
= ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue(); .getReturnValueCallback();
return new AsyncGetFuture<>(asyncGet); Future<T> returnFuture = new AbstractFuture<T>() {
private final AtomicBoolean called = new AtomicBoolean(false);
public T get() throws InterruptedException, ExecutionException {
if (called.compareAndSet(false, true)) {
try {
set(returnValueCallback.call());
} catch (Exception e) {
setException(e);
}
}
return super.get();
}
};
return returnFuture;
} }
/** /**

View File

@ -24,8 +24,7 @@ import java.util.EnumSet;
import java.util.List; import java.util.List;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -192,7 +191,6 @@ import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import org.apache.hadoop.util.concurrent.AsyncGet;
/** /**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@ -204,8 +202,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
public class ClientNamenodeProtocolTranslatorPB implements public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator { ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy; final private ClientNamenodeProtocolPB rpcProxy;
private static final ThreadLocal<AsyncGet<?, Exception>> private static final ThreadLocal<Callable<?>>
ASYNC_RETURN_VALUE = new ThreadLocal<>(); RETURN_VALUE_CALLBACK = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST = static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build(); GetServerDefaultsRequestProto.newBuilder().build();
@ -240,8 +238,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Unstable @Unstable
public static <T> AsyncGet<T, Exception> getAsyncReturnValue() { public static <T> Callable<T> getReturnValueCallback() {
return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get(); return (Callable<T>) RETURN_VALUE_CALLBACK.get();
} }
@Override @Override
@ -363,7 +361,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try { try {
if (Client.isAsynchronousMode()) { if (Client.isAsynchronousMode()) {
rpcProxy.setPermission(null, req); rpcProxy.setPermission(null, req);
setAsyncReturnValue(); setReturnValueCallback();
} else { } else {
rpcProxy.setPermission(null, req); rpcProxy.setPermission(null, req);
} }
@ -372,18 +370,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
} }
} }
private void setAsyncReturnValue() { private void setReturnValueCallback() {
final AsyncGet<Message, Exception> asyncReturnMessage final Callable<Message> returnMessageCallback = ProtobufRpcEngine
= ProtobufRpcEngine.getAsyncReturnMessage(); .getReturnMessageCallback();
final AsyncGet<Void, Exception> asyncGet Callable<Void> callBack = new Callable<Void>() {
= new AsyncGet<Void, Exception>() {
@Override @Override
public Void get(long timeout, TimeUnit unit) throws Exception { public Void call() throws Exception {
asyncReturnMessage.get(timeout, unit); returnMessageCallback.call();
return null; return null;
} }
}; };
ASYNC_RETURN_VALUE.set(asyncGet); RETURN_VALUE_CALLBACK.set(callBack);
} }
@Override @Override
@ -398,7 +395,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try { try {
if (Client.isAsynchronousMode()) { if (Client.isAsynchronousMode()) {
rpcProxy.setOwner(null, req.build()); rpcProxy.setOwner(null, req.build());
setAsyncReturnValue(); setReturnValueCallback();
} else { } else {
rpcProxy.setOwner(null, req.build()); rpcProxy.setOwner(null, req.build());
} }
@ -530,7 +527,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try { try {
if (Client.isAsynchronousMode()) { if (Client.isAsynchronousMode()) {
rpcProxy.rename2(null, req); rpcProxy.rename2(null, req);
setAsyncReturnValue(); setReturnValueCallback();
} else { } else {
rpcProxy.rename2(null, req); rpcProxy.rename2(null, req);
} }