HADOOP-8533. Remove parallel call ununsed capability in RPC. Contributed by Brandon Li.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1356504 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-07-02 22:15:44 +00:00
parent 72a5f92e69
commit 22822df7c3
8 changed files with 3 additions and 265 deletions

View File

@ -82,6 +82,9 @@ Trunk (unreleased changes)
HADOOP-8059. Add javadoc to InterfaceAudience and InterfaceStability.
(Brandon Li via suresh)
HADOOP-8533. Remove parallel call ununsed capability in RPC.
(Brandon Li via suresh)
BUG FIXES
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.

View File

@ -971,43 +971,6 @@ private void cleanupCalls() {
}
}
/** Call implementation used for parallel calls. */
private class ParallelCall extends Call {
private ParallelResults results;
private int index;
public ParallelCall(Writable param, ParallelResults results, int index) {
super(RPC.RpcKind.RPC_WRITABLE, param);
this.results = results;
this.index = index;
}
/** Deliver result to result collector. */
protected void callComplete() {
results.callComplete(this);
}
}
/** Result collector for parallel calls. */
private static class ParallelResults {
private Writable[] values;
private int size;
private int count;
public ParallelResults(int size) {
this.values = new Writable[size];
this.size = size;
}
/** Collect a result. */
public synchronized void callComplete(ParallelCall call) {
values[call.index] = call.getRpcResult(); // store the value
count++; // count it
if (count == size) // if all values are in
notify(); // then notify waiting caller
}
}
/** Construct an IPC client whose values are of the given {@link Writable}
* class. */
public Client(Class<? extends Writable> valueClass, Configuration conf,
@ -1209,63 +1172,6 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
}
}
/**
* @deprecated Use {@link #call(Writable[], InetSocketAddress[],
* Class, UserGroupInformation, Configuration)} instead
*/
@Deprecated
public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
throws IOException, InterruptedException {
return call(params, addresses, null, null, conf);
}
/**
* @deprecated Use {@link #call(Writable[], InetSocketAddress[],
* Class, UserGroupInformation, Configuration)} instead
*/
@Deprecated
public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
Class<?> protocol, UserGroupInformation ticket)
throws IOException, InterruptedException {
return call(params, addresses, protocol, ticket, conf);
}
/** Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored. */
public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
Class<?> protocol, UserGroupInformation ticket, Configuration conf)
throws IOException, InterruptedException {
if (addresses.length == 0) return new Writable[0];
ParallelResults results = new ParallelResults(params.length);
synchronized (results) {
for (int i = 0; i < params.length; i++) {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
ConnectionId remoteId = ConnectionId.getConnectionId(addresses[i],
protocol, ticket, 0, conf);
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
// log errors
LOG.info("Calling "+addresses[i]+" caught: " +
e.getMessage(),e);
results.size--; // wait for one fewer result
}
}
while (results.count != results.size) {
try {
results.wait(); // wait for all results
} catch (InterruptedException e) {}
}
return results.values;
}
}
// for unit testing only
@InterfaceAudience.Private
@InterfaceStability.Unstable

View File

@ -244,12 +244,6 @@ public ConnectionId getConnectionId() {
}
}
@Override
public Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) {
throw new UnsupportedOperationException();
}
/**
* Writable Wrapper for Protocol Buffer Requests
*/

View File

@ -21,7 +21,6 @@
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@ -627,27 +626,6 @@ public static void stopProxy(Object proxy) {
+ proxy.getClass());
}
/**
* Expert: Make multiple, parallel calls to a set of servers.
* @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead
*/
@Deprecated
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, Configuration conf)
throws IOException, InterruptedException {
return call(method, params, addrs, null, conf);
}
/** Expert: Make multiple, parallel calls to a set of servers. */
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf)
throws IOException, InterruptedException {
return getProtocolEngine(method.getDeclaringClass(), conf)
.call(method, params, addrs, ticket, conf);
}
/** Construct a server for a protocol implementation instance listening on a
* port and address.
* @deprecated protocol interface should be passed.

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import javax.net.SocketFactory;
@ -44,11 +43,6 @@ <T> ProtocolProxy<T> getProxy(Class<T> protocol,
SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException;
/** Expert: Make multiple, parallel calls to a set of servers. */
Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf)
throws IOException, InterruptedException;
/**
* Construct a server for a protocol implementation instance.
*

View File

@ -20,7 +20,6 @@
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
@ -274,36 +273,6 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
return new ProtocolProxy<T>(protocol, proxy, true);
}
/** Expert: Make multiple, parallel calls to a set of servers. */
public Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf)
throws IOException, InterruptedException {
Invocation[] invocations = new Invocation[params.length];
for (int i = 0; i < params.length; i++)
invocations[i] = new Invocation(method, params[i]);
Client client = CLIENTS.getClient(conf);
try {
Writable[] wrappedValues =
client.call(invocations, addrs, method.getDeclaringClass(), ticket, conf);
if (method.getReturnType() == Void.TYPE) {
return null;
}
Object[] values =
(Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
for (int i = 0; i < values.length; i++)
if (wrappedValues[i] != null)
values[i] = ((ObjectWritable)wrappedValues[i]).get();
return values;
} finally {
CLIENTS.stopClient(client);
}
}
/* Construct a server for a protocol implementation instance listening on a
* port and address. */
@Override

View File

@ -149,41 +149,6 @@ public void run() {
}
}
private static class ParallelCaller extends Thread {
private Client client;
private int count;
private InetSocketAddress[] addresses;
private boolean failed;
public ParallelCaller(Client client, InetSocketAddress[] addresses,
int count) {
this.client = client;
this.addresses = addresses;
this.count = count;
}
public void run() {
for (int i = 0; i < count; i++) {
try {
Writable[] params = new Writable[addresses.length];
for (int j = 0; j < addresses.length; j++)
params[j] = new LongWritable(RANDOM.nextLong());
Writable[] values = client.call(params, addresses, null, null, conf);
for (int j = 0; j < addresses.length; j++) {
if (!params[j].equals(values[j])) {
LOG.fatal("Call failed!");
failed = true;
break;
}
}
} catch (Exception e) {
LOG.fatal("Caught: " + StringUtils.stringifyException(e));
failed = true;
}
}
}
}
@Test
public void testSerial() throws Exception {
testSerial(3, false, 2, 5, 100);
@ -217,52 +182,8 @@ public void testSerial(int handlerCount, boolean handlerSleep,
server.stop();
}
@Test
public void testParallel() throws Exception {
testParallel(10, false, 2, 4, 2, 4, 100);
}
public void testParallel(int handlerCount, boolean handlerSleep,
int serverCount, int addressCount,
int clientCount, int callerCount, int callCount)
throws Exception {
Server[] servers = new Server[serverCount];
for (int i = 0; i < serverCount; i++) {
servers[i] = new TestServer(handlerCount, handlerSleep);
servers[i].start();
}
InetSocketAddress[] addresses = new InetSocketAddress[addressCount];
for (int i = 0; i < addressCount; i++) {
addresses[i] = NetUtils.getConnectAddress(servers[i%serverCount]);
}
Client[] clients = new Client[clientCount];
for (int i = 0; i < clientCount; i++) {
clients[i] = new Client(LongWritable.class, conf);
}
ParallelCaller[] callers = new ParallelCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
callers[i] =
new ParallelCaller(clients[i%clientCount], addresses, callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
callers[i].join();
assertFalse(callers[i].failed);
}
for (int i = 0; i < clientCount; i++) {
clients[i].stop();
}
for (int i = 0; i < serverCount; i++) {
servers[i].stop();
}
}
@Test
public void testStandAloneClient() throws Exception {
testParallel(10, false, 2, 4, 2, 4, 100);
Client client = new Client(LongWritable.class, conf);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
@ -781,13 +702,4 @@ private static abstract class NetworkTraces {
Ints.toByteArray(HADOOP0_21_ERROR_MSG.length()),
HADOOP0_21_ERROR_MSG.getBytes());
}
public static void main(String[] args) throws Exception {
//new TestIPC().testSerial(5, false, 2, 10, 1000);
new TestIPC().testParallel(10, false, 2, 4, 2, 4, 1000);
}
}

View File

@ -244,13 +244,6 @@ private static interface StoppedProtocol {
*/
private static class StoppedRpcEngine implements RpcEngine {
@Override
public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf)
throws IOException, InterruptedException {
return null;
}
@SuppressWarnings("unchecked")
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@ -491,17 +484,6 @@ private void testCallsInternal(Configuration conf) throws Exception {
}
}
// try some multi-calls
Method echo =
TestProtocol.class.getMethod("echo", new Class[] { String.class });
String[] strings = (String[])RPC.call(echo, new String[][]{{"a"},{"b"}},
new InetSocketAddress[] {addr, addr}, conf);
assertTrue(Arrays.equals(strings, new String[]{"a","b"}));
Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
Object[] voids = RPC.call(ping, new Object[][]{{},{}},
new InetSocketAddress[] {addr, addr}, conf);
assertEquals(voids, null);
} finally {
server.stop();
if(proxy!=null) RPC.stopProxy(proxy);