Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls."
This reverts commit fa7c7f2510
.
This commit is contained in:
parent
1de712f22a
commit
308d28640d
|
@ -18,10 +18,46 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import static org.apache.hadoop.ipc.RpcConstants.*;
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import java.io.BufferedInputStream;
|
||||||
import com.google.protobuf.CodedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.EOFException;
|
||||||
|
import java.io.FilterInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Hashtable;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import javax.net.SocketFactory;
|
||||||
|
import javax.security.sasl.Sasl;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -57,25 +93,14 @@ import org.apache.hadoop.util.ProtoUtil;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGet;
|
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
|
|
||||||
import org.apache.htrace.core.Span;
|
import org.apache.htrace.core.Span;
|
||||||
import org.apache.htrace.core.Tracer;
|
import org.apache.htrace.core.Tracer;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import javax.security.sasl.Sasl;
|
import com.google.common.base.Preconditions;
|
||||||
import java.io.*;
|
import com.google.common.util.concurrent.AbstractFuture;
|
||||||
import java.net.*;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import com.google.protobuf.CodedOutputStream;
|
||||||
import java.util.*;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.concurrent.*;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
|
||||||
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
|
|
||||||
|
|
||||||
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
|
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
|
||||||
* parameter, and return a {@link Writable} as their value. A service runs on
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
||||||
|
@ -94,8 +119,8 @@ public class Client implements AutoCloseable {
|
||||||
|
|
||||||
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
|
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
|
||||||
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
|
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
|
||||||
private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
|
private static final ThreadLocal<Future<?>>
|
||||||
= new ThreadLocal<>();
|
RETURN_RPC_RESPONSE = new ThreadLocal<>();
|
||||||
private static final ThreadLocal<Boolean> asynchronousMode =
|
private static final ThreadLocal<Boolean> asynchronousMode =
|
||||||
new ThreadLocal<Boolean>() {
|
new ThreadLocal<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -106,8 +131,8 @@ public class Client implements AutoCloseable {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Unstable
|
@Unstable
|
||||||
public static <T> Future<T> getAsyncRpcResponse() {
|
public static <T> Future<T> getReturnRpcResponse() {
|
||||||
return (Future<T>) ASYNC_RPC_RESPONSE.get();
|
return (Future<T>) RETURN_RPC_RESPONSE.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Set call id and retry count for the next call. */
|
/** Set call id and retry count for the next call. */
|
||||||
|
@ -356,11 +381,6 @@ public class Client implements AutoCloseable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return getClass().getSimpleName() + id;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Indicate when the call is complete and the
|
/** Indicate when the call is complete and the
|
||||||
* value or error are available. Notifies by default. */
|
* value or error are available. Notifies by default. */
|
||||||
protected synchronized void callComplete() {
|
protected synchronized void callComplete() {
|
||||||
|
@ -1394,32 +1414,27 @@ public class Client implements AutoCloseable {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isAsynchronousMode()) {
|
if (isAsynchronousMode()) {
|
||||||
final AsyncGet<Writable, IOException> asyncGet
|
Future<Writable> returnFuture = new AbstractFuture<Writable>() {
|
||||||
= new AsyncGet<Writable, IOException>() {
|
private final AtomicBoolean callled = new AtomicBoolean(false);
|
||||||
@Override
|
@Override
|
||||||
public Writable get(long timeout, TimeUnit unit)
|
public Writable get() throws InterruptedException, ExecutionException {
|
||||||
throws IOException, TimeoutException{
|
if (callled.compareAndSet(false, true)) {
|
||||||
boolean done = true;
|
|
||||||
try {
|
try {
|
||||||
final Writable w = getRpcResponse(call, connection, timeout, unit);
|
set(getRpcResponse(call, connection));
|
||||||
if (w == null) {
|
} catch (IOException ie) {
|
||||||
done = false;
|
setException(ie);
|
||||||
throw new TimeoutException(call + " timed out "
|
|
||||||
+ timeout + " " + unit);
|
|
||||||
}
|
|
||||||
return w;
|
|
||||||
} finally {
|
} finally {
|
||||||
if (done) {
|
|
||||||
releaseAsyncCall();
|
releaseAsyncCall();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return super.get();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
|
RETURN_RPC_RESPONSE.set(returnFuture);
|
||||||
return null;
|
return null;
|
||||||
} else {
|
} else {
|
||||||
return getRpcResponse(call, connection, -1, null);
|
return getRpcResponse(call, connection);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1455,18 +1470,12 @@ public class Client implements AutoCloseable {
|
||||||
return asyncCallCounter.get();
|
return asyncCallCounter.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return the rpc response or, in case of timeout, null. */
|
private Writable getRpcResponse(final Call call, final Connection connection)
|
||||||
private Writable getRpcResponse(final Call call, final Connection connection,
|
throws IOException {
|
||||||
final long timeout, final TimeUnit unit) throws IOException {
|
|
||||||
synchronized (call) {
|
synchronized (call) {
|
||||||
while (!call.done) {
|
while (!call.done) {
|
||||||
try {
|
try {
|
||||||
final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
|
call.wait(); // wait for the result
|
||||||
timeout, unit);
|
|
||||||
call.wait(waitTimeout); // wait for the result
|
|
||||||
if (waitTimeout > 0 && !call.done) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw new InterruptedIOException("Call interrupted");
|
throw new InterruptedIOException("Call interrupted");
|
||||||
|
|
|
@ -18,9 +18,21 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.DataInput;
|
||||||
import com.google.protobuf.*;
|
import java.io.DataOutput;
|
||||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.lang.reflect.Proxy;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -40,23 +52,17 @@ import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.util.ProtoUtil;
|
import org.apache.hadoop.util.ProtoUtil;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGet;
|
|
||||||
import org.apache.htrace.core.TraceScope;
|
import org.apache.htrace.core.TraceScope;
|
||||||
import org.apache.htrace.core.Tracer;
|
import org.apache.htrace.core.Tracer;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.io.DataInput;
|
import com.google.protobuf.BlockingService;
|
||||||
import java.io.DataOutput;
|
import com.google.protobuf.CodedOutputStream;
|
||||||
import java.io.IOException;
|
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||||
import java.io.OutputStream;
|
import com.google.protobuf.GeneratedMessage;
|
||||||
import java.lang.reflect.Method;
|
import com.google.protobuf.Message;
|
||||||
import java.lang.reflect.Proxy;
|
import com.google.protobuf.ServiceException;
|
||||||
import java.net.InetSocketAddress;
|
import com.google.protobuf.TextFormat;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RPC Engine for for protobuf based RPCs.
|
* RPC Engine for for protobuf based RPCs.
|
||||||
|
@ -64,8 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class ProtobufRpcEngine implements RpcEngine {
|
public class ProtobufRpcEngine implements RpcEngine {
|
||||||
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
|
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
|
||||||
private static final ThreadLocal<AsyncGet<Message, Exception>>
|
private static final ThreadLocal<Callable<?>>
|
||||||
ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
|
RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
|
||||||
|
|
||||||
static { // Register the rpcRequest deserializer for WritableRpcEngine
|
static { // Register the rpcRequest deserializer for WritableRpcEngine
|
||||||
org.apache.hadoop.ipc.Server.registerProtocolEngine(
|
org.apache.hadoop.ipc.Server.registerProtocolEngine(
|
||||||
|
@ -75,9 +81,10 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
|
|
||||||
private static final ClientCache CLIENTS = new ClientCache();
|
private static final ClientCache CLIENTS = new ClientCache();
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Unstable
|
@Unstable
|
||||||
public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
|
public static <T> Callable<T> getReturnMessageCallback() {
|
||||||
return ASYNC_RETURN_MESSAGE.get();
|
return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
|
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
|
||||||
|
@ -256,17 +263,14 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Client.isAsynchronousMode()) {
|
if (Client.isAsynchronousMode()) {
|
||||||
final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
|
final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
|
||||||
final AsyncGet<Message, Exception> asyncGet
|
Callable<Message> callback = new Callable<Message>() {
|
||||||
= new AsyncGet<Message, Exception>() {
|
|
||||||
@Override
|
@Override
|
||||||
public Message get(long timeout, TimeUnit unit) throws Exception {
|
public Message call() throws Exception {
|
||||||
final RpcResponseWrapper rrw = timeout < 0?
|
return getReturnMessage(method, frrw.get());
|
||||||
frrw.get(): frrw.get(timeout, unit);
|
|
||||||
return getReturnMessage(method, rrw);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
ASYNC_RETURN_MESSAGE.set(asyncGet);
|
RETURN_MESSAGE_CALLBACK.set(callback);
|
||||||
return null;
|
return null;
|
||||||
} else {
|
} else {
|
||||||
return getReturnMessage(method, val);
|
return getReturnMessage(method, val);
|
||||||
|
|
|
@ -1,60 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.util.concurrent;
|
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This interface defines an asynchronous {@link #get(long, TimeUnit)} method.
|
|
||||||
*
|
|
||||||
* When the return value is still being computed, invoking
|
|
||||||
* {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}.
|
|
||||||
* The method should be invoked again and again
|
|
||||||
* until the underlying computation is completed.
|
|
||||||
*
|
|
||||||
* @param <R> The type of the return value.
|
|
||||||
* @param <E> The exception type that the underlying implementation may throw.
|
|
||||||
*/
|
|
||||||
public interface AsyncGet<R, E extends Throwable> {
|
|
||||||
/**
|
|
||||||
* Get the result.
|
|
||||||
*
|
|
||||||
* @param timeout The maximum time period to wait.
|
|
||||||
* When timeout == 0, it does not wait at all.
|
|
||||||
* When timeout < 0, it waits indefinitely.
|
|
||||||
* @param unit The unit of the timeout value
|
|
||||||
* @return the result, which is possibly null.
|
|
||||||
* @throws E an exception thrown by the underlying implementation.
|
|
||||||
* @throws TimeoutException if it cannot return after the given time period.
|
|
||||||
* @throws InterruptedException if the thread is interrupted.
|
|
||||||
*/
|
|
||||||
R get(long timeout, TimeUnit unit)
|
|
||||||
throws E, TimeoutException, InterruptedException;
|
|
||||||
|
|
||||||
/** Utility */
|
|
||||||
class Util {
|
|
||||||
/**
|
|
||||||
* @return {@link Object#wait(long)} timeout converted
|
|
||||||
* from {@link #get(long, TimeUnit)} timeout.
|
|
||||||
*/
|
|
||||||
public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
|
|
||||||
return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,73 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.util.concurrent;
|
|
||||||
|
|
||||||
import com.google.common.util.concurrent.AbstractFuture;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
/** A {@link Future} implemented using an {@link AsyncGet} object. */
|
|
||||||
public class AsyncGetFuture<T, E extends Throwable> extends AbstractFuture<T> {
|
|
||||||
public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class);
|
|
||||||
|
|
||||||
private final AtomicBoolean called = new AtomicBoolean(false);
|
|
||||||
private final AsyncGet<T, E> asyncGet;
|
|
||||||
|
|
||||||
public AsyncGetFuture(AsyncGet<T, E> asyncGet) {
|
|
||||||
this.asyncGet = asyncGet;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void callAsyncGet(long timeout, TimeUnit unit) {
|
|
||||||
if (!isCancelled() && called.compareAndSet(false, true)) {
|
|
||||||
try {
|
|
||||||
set(asyncGet.get(timeout, unit));
|
|
||||||
} catch (TimeoutException te) {
|
|
||||||
LOG.trace("TRACE", te);
|
|
||||||
called.compareAndSet(true, false);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
LOG.trace("TRACE", e);
|
|
||||||
setException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T get() throws InterruptedException, ExecutionException {
|
|
||||||
callAsyncGet(-1, TimeUnit.MILLISECONDS);
|
|
||||||
return super.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public T get(long timeout, TimeUnit unit)
|
|
||||||
throws InterruptedException, TimeoutException, ExecutionException {
|
|
||||||
callAsyncGet(timeout, unit);
|
|
||||||
return super.get(0, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDone() {
|
|
||||||
callAsyncGet(0, TimeUnit.MILLISECONDS);
|
|
||||||
return super.isDone();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -18,6 +18,20 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -34,17 +48,6 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
|
|
||||||
public class TestAsyncIPC {
|
public class TestAsyncIPC {
|
||||||
|
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
|
@ -84,51 +87,26 @@ public class TestAsyncIPC {
|
||||||
try {
|
try {
|
||||||
final long param = TestIPC.RANDOM.nextLong();
|
final long param = TestIPC.RANDOM.nextLong();
|
||||||
TestIPC.call(client, param, server, conf);
|
TestIPC.call(client, param, server, conf);
|
||||||
Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
|
Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
|
||||||
returnFutures.put(i, returnFuture);
|
returnFutures.put(i, returnFuture);
|
||||||
expectedValues.put(i, param);
|
expectedValues.put(i, param);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
LOG.fatal("Caught: " + StringUtils.stringifyException(e));
|
||||||
failed = true;
|
failed = true;
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void assertReturnValues() throws InterruptedException, ExecutionException {
|
public void waitForReturnValues() throws InterruptedException,
|
||||||
|
ExecutionException {
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
LongWritable value = returnFutures.get(i).get();
|
LongWritable value = returnFutures.get(i).get();
|
||||||
Assert.assertEquals("call" + i + " failed.",
|
if (expectedValues.get(i) != value.get()) {
|
||||||
expectedValues.get(i).longValue(), value.get());
|
LOG.fatal(String.format("Call-%d failed!", i));
|
||||||
}
|
failed = true;
|
||||||
Assert.assertFalse(failed);
|
break;
|
||||||
}
|
|
||||||
|
|
||||||
void assertReturnValues(long timeout, TimeUnit unit)
|
|
||||||
throws InterruptedException, ExecutionException {
|
|
||||||
final boolean[] checked = new boolean[count];
|
|
||||||
for(boolean done = false; !done;) {
|
|
||||||
done = true;
|
|
||||||
for (int i = 0; i < count; i++) {
|
|
||||||
if (checked[i]) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
done = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
final LongWritable value;
|
|
||||||
try {
|
|
||||||
value = returnFutures.get(i).get(timeout, unit);
|
|
||||||
} catch (TimeoutException e) {
|
|
||||||
LOG.info("call" + i + " caught ", e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert.assertEquals("call" + i + " failed.",
|
|
||||||
expectedValues.get(i).longValue(), value.get());
|
|
||||||
checked[i] = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Assert.assertFalse(failed);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,7 +183,7 @@ public class TestAsyncIPC {
|
||||||
|
|
||||||
private void doCall(final int idx, final long param) throws IOException {
|
private void doCall(final int idx, final long param) throws IOException {
|
||||||
TestIPC.call(client, param, server, conf);
|
TestIPC.call(client, param, server, conf);
|
||||||
Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
|
Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
|
||||||
returnFutures.put(idx, returnFuture);
|
returnFutures.put(idx, returnFuture);
|
||||||
expectedValues.put(idx, param);
|
expectedValues.put(idx, param);
|
||||||
}
|
}
|
||||||
|
@ -255,7 +233,10 @@ public class TestAsyncIPC {
|
||||||
}
|
}
|
||||||
for (int i = 0; i < callerCount; i++) {
|
for (int i = 0; i < callerCount; i++) {
|
||||||
callers[i].join();
|
callers[i].join();
|
||||||
callers[i].assertReturnValues();
|
callers[i].waitForReturnValues();
|
||||||
|
String msg = String.format("Expected not failed for caller-%d: %s.", i,
|
||||||
|
callers[i]);
|
||||||
|
assertFalse(msg, callers[i].failed);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < clientCount; i++) {
|
for (int i = 0; i < clientCount; i++) {
|
||||||
clients[i].stop();
|
clients[i].stop();
|
||||||
|
@ -277,37 +258,25 @@ public class TestAsyncIPC {
|
||||||
try {
|
try {
|
||||||
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
|
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
|
||||||
caller.run();
|
caller.run();
|
||||||
caller.assertReturnValues();
|
|
||||||
caller.assertReturnValues();
|
caller.waitForReturnValues();
|
||||||
caller.assertReturnValues();
|
String msg = String.format(
|
||||||
Assert.assertEquals(asyncCallCount, client.getAsyncCallCount());
|
"First time, expected not failed for caller: %s.", caller);
|
||||||
|
assertFalse(msg, caller.failed);
|
||||||
|
|
||||||
|
caller.waitForReturnValues();
|
||||||
|
assertTrue(asyncCallCount == client.getAsyncCallCount());
|
||||||
|
msg = String.format("Second time, expected not failed for caller: %s.",
|
||||||
|
caller);
|
||||||
|
assertFalse(msg, caller.failed);
|
||||||
|
|
||||||
|
assertTrue(asyncCallCount == client.getAsyncCallCount());
|
||||||
} finally {
|
} finally {
|
||||||
client.stop();
|
client.stop();
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
|
||||||
public void testFutureGetWithTimeout() throws IOException,
|
|
||||||
InterruptedException, ExecutionException {
|
|
||||||
// GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
|
|
||||||
final Server server = new TestIPC.TestServer(10, true, conf);
|
|
||||||
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
||||||
server.start();
|
|
||||||
|
|
||||||
final Client client = new Client(LongWritable.class, conf);
|
|
||||||
|
|
||||||
try {
|
|
||||||
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
|
||||||
caller.run();
|
|
||||||
caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
|
|
||||||
} finally {
|
|
||||||
client.stop();
|
|
||||||
server.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
|
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
|
||||||
int clientCount, int callerCount, int callCount) throws IOException,
|
int clientCount, int callerCount, int callCount) throws IOException,
|
||||||
InterruptedException, ExecutionException {
|
InterruptedException, ExecutionException {
|
||||||
|
@ -398,7 +367,9 @@ public class TestAsyncIPC {
|
||||||
server.start();
|
server.start();
|
||||||
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
|
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
|
||||||
caller.run();
|
caller.run();
|
||||||
caller.assertReturnValues();
|
caller.waitForReturnValues();
|
||||||
|
String msg = String.format("Expected not failed for caller: %s.", caller);
|
||||||
|
assertFalse(msg, caller.failed);
|
||||||
} finally {
|
} finally {
|
||||||
client.stop();
|
client.stop();
|
||||||
server.stop();
|
server.stop();
|
||||||
|
@ -435,7 +406,9 @@ public class TestAsyncIPC {
|
||||||
server.start();
|
server.start();
|
||||||
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
||||||
caller.run();
|
caller.run();
|
||||||
caller.assertReturnValues();
|
caller.waitForReturnValues();
|
||||||
|
String msg = String.format("Expected not failed for caller: %s.", caller);
|
||||||
|
assertFalse(msg, caller.failed);
|
||||||
} finally {
|
} finally {
|
||||||
client.stop();
|
client.stop();
|
||||||
server.stop();
|
server.stop();
|
||||||
|
@ -470,7 +443,9 @@ public class TestAsyncIPC {
|
||||||
server.start();
|
server.start();
|
||||||
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
||||||
caller.run();
|
caller.run();
|
||||||
caller.assertReturnValues();
|
caller.waitForReturnValues();
|
||||||
|
String msg = String.format("Expected not failed for caller: %s.", caller);
|
||||||
|
assertFalse(msg, caller.failed);
|
||||||
} finally {
|
} finally {
|
||||||
client.stop();
|
client.stop();
|
||||||
server.stop();
|
server.stop();
|
||||||
|
@ -514,7 +489,10 @@ public class TestAsyncIPC {
|
||||||
}
|
}
|
||||||
for (int i = 0; i < callerCount; ++i) {
|
for (int i = 0; i < callerCount; ++i) {
|
||||||
callers[i].join();
|
callers[i].join();
|
||||||
callers[i].assertReturnValues();
|
callers[i].waitForReturnValues();
|
||||||
|
String msg = String.format("Expected not failed for caller-%d: %s.", i,
|
||||||
|
callers[i]);
|
||||||
|
assertFalse(msg, callers[i].failed);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
client.stop();
|
client.stop();
|
||||||
|
|
|
@ -19,17 +19,20 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGet;
|
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
|
|
||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.AbstractFuture;
|
||||||
|
|
||||||
/****************************************************************
|
/****************************************************************
|
||||||
* Implementation of the asynchronous distributed file system.
|
* Implementation of the asynchronous distributed file system.
|
||||||
* This instance of this class is the way end-user code interacts
|
* This instance of this class is the way end-user code interacts
|
||||||
|
@ -49,9 +52,22 @@ public class AsyncDistributedFileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
static <T> Future<T> getReturnValue() {
|
static <T> Future<T> getReturnValue() {
|
||||||
final AsyncGet<T, Exception> asyncGet
|
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
|
||||||
= ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue();
|
.getReturnValueCallback();
|
||||||
return new AsyncGetFuture<>(asyncGet);
|
Future<T> returnFuture = new AbstractFuture<T>() {
|
||||||
|
private final AtomicBoolean called = new AtomicBoolean(false);
|
||||||
|
public T get() throws InterruptedException, ExecutionException {
|
||||||
|
if (called.compareAndSet(false, true)) {
|
||||||
|
try {
|
||||||
|
set(returnValueCallback.call());
|
||||||
|
} catch (Exception e) {
|
||||||
|
setException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return super.get();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return returnFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,8 +24,7 @@ import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
@ -191,7 +190,6 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.util.concurrent.AsyncGet;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
|
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
|
||||||
|
@ -203,8 +201,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
|
||||||
public class ClientNamenodeProtocolTranslatorPB implements
|
public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
|
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
|
||||||
final private ClientNamenodeProtocolPB rpcProxy;
|
final private ClientNamenodeProtocolPB rpcProxy;
|
||||||
private static final ThreadLocal<AsyncGet<?, Exception>>
|
private static final ThreadLocal<Callable<?>>
|
||||||
ASYNC_RETURN_VALUE = new ThreadLocal<>();
|
RETURN_VALUE_CALLBACK = new ThreadLocal<>();
|
||||||
|
|
||||||
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
|
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
|
||||||
GetServerDefaultsRequestProto.newBuilder().build();
|
GetServerDefaultsRequestProto.newBuilder().build();
|
||||||
|
@ -239,8 +237,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Unstable
|
@Unstable
|
||||||
public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
|
public static <T> Callable<T> getReturnValueCallback() {
|
||||||
return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
|
return (Callable<T>) RETURN_VALUE_CALLBACK.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -362,7 +360,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
try {
|
try {
|
||||||
if (Client.isAsynchronousMode()) {
|
if (Client.isAsynchronousMode()) {
|
||||||
rpcProxy.setPermission(null, req);
|
rpcProxy.setPermission(null, req);
|
||||||
setAsyncReturnValue();
|
setReturnValueCallback();
|
||||||
} else {
|
} else {
|
||||||
rpcProxy.setPermission(null, req);
|
rpcProxy.setPermission(null, req);
|
||||||
}
|
}
|
||||||
|
@ -371,18 +369,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setAsyncReturnValue() {
|
private void setReturnValueCallback() {
|
||||||
final AsyncGet<Message, Exception> asyncReturnMessage
|
final Callable<Message> returnMessageCallback = ProtobufRpcEngine
|
||||||
= ProtobufRpcEngine.getAsyncReturnMessage();
|
.getReturnMessageCallback();
|
||||||
final AsyncGet<Void, Exception> asyncGet
|
Callable<Void> callBack = new Callable<Void>() {
|
||||||
= new AsyncGet<Void, Exception>() {
|
|
||||||
@Override
|
@Override
|
||||||
public Void get(long timeout, TimeUnit unit) throws Exception {
|
public Void call() throws Exception {
|
||||||
asyncReturnMessage.get(timeout, unit);
|
returnMessageCallback.call();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
ASYNC_RETURN_VALUE.set(asyncGet);
|
RETURN_VALUE_CALLBACK.set(callBack);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -397,7 +394,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
try {
|
try {
|
||||||
if (Client.isAsynchronousMode()) {
|
if (Client.isAsynchronousMode()) {
|
||||||
rpcProxy.setOwner(null, req.build());
|
rpcProxy.setOwner(null, req.build());
|
||||||
setAsyncReturnValue();
|
setReturnValueCallback();
|
||||||
} else {
|
} else {
|
||||||
rpcProxy.setOwner(null, req.build());
|
rpcProxy.setOwner(null, req.build());
|
||||||
}
|
}
|
||||||
|
@ -529,7 +526,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
try {
|
try {
|
||||||
if (Client.isAsynchronousMode()) {
|
if (Client.isAsynchronousMode()) {
|
||||||
rpcProxy.rename2(null, req);
|
rpcProxy.rename2(null, req);
|
||||||
setAsyncReturnValue();
|
setReturnValueCallback();
|
||||||
} else {
|
} else {
|
||||||
rpcProxy.rename2(null, req);
|
rpcProxy.rename2(null, req);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue