From 22822df7c3d08f157cf268b907083d0782d3fa67 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Mon, 2 Jul 2012 22:15:44 +0000 Subject: [PATCH] HADOOP-8533. Remove parallel call ununsed capability in RPC. Contributed by Brandon Li. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1356504 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../java/org/apache/hadoop/ipc/Client.java | 94 ------------------- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 6 -- .../main/java/org/apache/hadoop/ipc/RPC.java | 22 ----- .../java/org/apache/hadoop/ipc/RpcEngine.java | 6 -- .../apache/hadoop/ipc/WritableRpcEngine.java | 31 ------ .../java/org/apache/hadoop/ipc/TestIPC.java | 88 ----------------- .../java/org/apache/hadoop/ipc/TestRPC.java | 18 ---- 8 files changed, 3 insertions(+), 265 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index b56015d054e..6d25a024914 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -82,6 +82,9 @@ Trunk (unreleased changes) HADOOP-8059. Add javadoc to InterfaceAudience and InterfaceStability. (Brandon Li via suresh) + HADOOP-8533. Remove parallel call ununsed capability in RPC. + (Brandon Li via suresh) + BUG FIXES HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index d382c99f616..235bb342dae 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -971,43 +971,6 @@ private void cleanupCalls() { } } - /** Call implementation used for parallel calls. */ - private class ParallelCall extends Call { - private ParallelResults results; - private int index; - - public ParallelCall(Writable param, ParallelResults results, int index) { - super(RPC.RpcKind.RPC_WRITABLE, param); - this.results = results; - this.index = index; - } - - /** Deliver result to result collector. */ - protected void callComplete() { - results.callComplete(this); - } - } - - /** Result collector for parallel calls. */ - private static class ParallelResults { - private Writable[] values; - private int size; - private int count; - - public ParallelResults(int size) { - this.values = new Writable[size]; - this.size = size; - } - - /** Collect a result. */ - public synchronized void callComplete(ParallelCall call) { - values[call.index] = call.getRpcResult(); // store the value - count++; // count it - if (count == size) // if all values are in - notify(); // then notify waiting caller - } - } - /** Construct an IPC client whose values are of the given {@link Writable} * class. */ public Client(Class valueClass, Configuration conf, @@ -1209,63 +1172,6 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, } } - /** - * @deprecated Use {@link #call(Writable[], InetSocketAddress[], - * Class, UserGroupInformation, Configuration)} instead - */ - @Deprecated - public Writable[] call(Writable[] params, InetSocketAddress[] addresses) - throws IOException, InterruptedException { - return call(params, addresses, null, null, conf); - } - - /** - * @deprecated Use {@link #call(Writable[], InetSocketAddress[], - * Class, UserGroupInformation, Configuration)} instead - */ - @Deprecated - public Writable[] call(Writable[] params, InetSocketAddress[] addresses, - Class protocol, UserGroupInformation ticket) - throws IOException, InterruptedException { - return call(params, addresses, protocol, ticket, conf); - } - - - /** Makes a set of calls in parallel. Each parameter is sent to the - * corresponding address. When all values are available, or have timed out - * or errored, the collected results are returned in an array. The array - * contains nulls for calls that timed out or errored. */ - public Writable[] call(Writable[] params, InetSocketAddress[] addresses, - Class protocol, UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException { - if (addresses.length == 0) return new Writable[0]; - - ParallelResults results = new ParallelResults(params.length); - synchronized (results) { - for (int i = 0; i < params.length; i++) { - ParallelCall call = new ParallelCall(params[i], results, i); - try { - ConnectionId remoteId = ConnectionId.getConnectionId(addresses[i], - protocol, ticket, 0, conf); - Connection connection = getConnection(remoteId, call); - connection.sendParam(call); // send each parameter - } catch (IOException e) { - // log errors - LOG.info("Calling "+addresses[i]+" caught: " + - e.getMessage(),e); - results.size--; // wait for one fewer result - } - } - while (results.count != results.size) { - try { - results.wait(); // wait for all results - } catch (InterruptedException e) {} - } - - return results.values; - } - } - // for unit testing only @InterfaceAudience.Private @InterfaceStability.Unstable diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index d355a85d4fe..c05ce75ec8c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -244,12 +244,6 @@ public ConnectionId getConnectionId() { } } - @Override - public Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) { - throw new UnsupportedOperationException(); - } - /** * Writable Wrapper for Protocol Buffer Requests */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 6a8a71f83ad..5263758aaf7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -21,7 +21,6 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; -import java.lang.reflect.Method; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -627,27 +626,6 @@ public static void stopProxy(Object proxy) { + proxy.getClass()); } - /** - * Expert: Make multiple, parallel calls to a set of servers. - * @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead - */ - @Deprecated - public static Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, Configuration conf) - throws IOException, InterruptedException { - return call(method, params, addrs, null, conf); - } - - /** Expert: Make multiple, parallel calls to a set of servers. */ - public static Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, - UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException { - - return getProtocolEngine(method.getDeclaringClass(), conf) - .call(method, params, addrs, ticket, conf); - } - /** Construct a server for a protocol implementation instance listening on a * port and address. * @deprecated protocol interface should be passed. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index 5dc48adef28..a8280bd2edf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -19,7 +19,6 @@ package org.apache.hadoop.ipc; import java.io.IOException; -import java.lang.reflect.Method; import java.net.InetSocketAddress; import javax.net.SocketFactory; @@ -44,11 +43,6 @@ ProtocolProxy getProxy(Class protocol, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException; - /** Expert: Make multiple, parallel calls to a set of servers. */ - Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, - UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException; - /** * Construct a server for a protocol implementation instance. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index f61f0f2fd76..56dba25848b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -20,7 +20,6 @@ import java.lang.reflect.Proxy; import java.lang.reflect.Method; -import java.lang.reflect.Array; import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; @@ -274,36 +273,6 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion, return new ProtocolProxy(protocol, proxy, true); } - /** Expert: Make multiple, parallel calls to a set of servers. */ - public Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, - UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException { - - Invocation[] invocations = new Invocation[params.length]; - for (int i = 0; i < params.length; i++) - invocations[i] = new Invocation(method, params[i]); - Client client = CLIENTS.getClient(conf); - try { - Writable[] wrappedValues = - client.call(invocations, addrs, method.getDeclaringClass(), ticket, conf); - - if (method.getReturnType() == Void.TYPE) { - return null; - } - - Object[] values = - (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); - for (int i = 0; i < values.length; i++) - if (wrappedValues[i] != null) - values[i] = ((ObjectWritable)wrappedValues[i]).get(); - - return values; - } finally { - CLIENTS.stopClient(client); - } - } - /* Construct a server for a protocol implementation instance listening on a * port and address. */ @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 5797bb524bc..c7bc6411de2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -149,41 +149,6 @@ public void run() { } } - private static class ParallelCaller extends Thread { - private Client client; - private int count; - private InetSocketAddress[] addresses; - private boolean failed; - - public ParallelCaller(Client client, InetSocketAddress[] addresses, - int count) { - this.client = client; - this.addresses = addresses; - this.count = count; - } - - public void run() { - for (int i = 0; i < count; i++) { - try { - Writable[] params = new Writable[addresses.length]; - for (int j = 0; j < addresses.length; j++) - params[j] = new LongWritable(RANDOM.nextLong()); - Writable[] values = client.call(params, addresses, null, null, conf); - for (int j = 0; j < addresses.length; j++) { - if (!params[j].equals(values[j])) { - LOG.fatal("Call failed!"); - failed = true; - break; - } - } - } catch (Exception e) { - LOG.fatal("Caught: " + StringUtils.stringifyException(e)); - failed = true; - } - } - } - } - @Test public void testSerial() throws Exception { testSerial(3, false, 2, 5, 100); @@ -217,52 +182,8 @@ public void testSerial(int handlerCount, boolean handlerSleep, server.stop(); } - @Test - public void testParallel() throws Exception { - testParallel(10, false, 2, 4, 2, 4, 100); - } - - public void testParallel(int handlerCount, boolean handlerSleep, - int serverCount, int addressCount, - int clientCount, int callerCount, int callCount) - throws Exception { - Server[] servers = new Server[serverCount]; - for (int i = 0; i < serverCount; i++) { - servers[i] = new TestServer(handlerCount, handlerSleep); - servers[i].start(); - } - - InetSocketAddress[] addresses = new InetSocketAddress[addressCount]; - for (int i = 0; i < addressCount; i++) { - addresses[i] = NetUtils.getConnectAddress(servers[i%serverCount]); - } - - Client[] clients = new Client[clientCount]; - for (int i = 0; i < clientCount; i++) { - clients[i] = new Client(LongWritable.class, conf); - } - - ParallelCaller[] callers = new ParallelCaller[callerCount]; - for (int i = 0; i < callerCount; i++) { - callers[i] = - new ParallelCaller(clients[i%clientCount], addresses, callCount); - callers[i].start(); - } - for (int i = 0; i < callerCount; i++) { - callers[i].join(); - assertFalse(callers[i].failed); - } - for (int i = 0; i < clientCount; i++) { - clients[i].stop(); - } - for (int i = 0; i < serverCount; i++) { - servers[i].stop(); - } - } - @Test public void testStandAloneClient() throws Exception { - testParallel(10, false, 2, 4, 2, 4, 100); Client client = new Client(LongWritable.class, conf); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10); try { @@ -781,13 +702,4 @@ private static abstract class NetworkTraces { Ints.toByteArray(HADOOP0_21_ERROR_MSG.length()), HADOOP0_21_ERROR_MSG.getBytes()); } - - public static void main(String[] args) throws Exception { - - //new TestIPC().testSerial(5, false, 2, 10, 1000); - - new TestIPC().testParallel(10, false, 2, 4, 2, 4, 1000); - - } - } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 5d3d335b32d..e2e32c75ba1 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -244,13 +244,6 @@ private static interface StoppedProtocol { */ private static class StoppedRpcEngine implements RpcEngine { - @Override - public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, - UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException { - return null; - } - @SuppressWarnings("unchecked") @Override public ProtocolProxy getProxy(Class protocol, long clientVersion, @@ -491,17 +484,6 @@ private void testCallsInternal(Configuration conf) throws Exception { } } - // try some multi-calls - Method echo = - TestProtocol.class.getMethod("echo", new Class[] { String.class }); - String[] strings = (String[])RPC.call(echo, new String[][]{{"a"},{"b"}}, - new InetSocketAddress[] {addr, addr}, conf); - assertTrue(Arrays.equals(strings, new String[]{"a","b"})); - - Method ping = TestProtocol.class.getMethod("ping", new Class[] {}); - Object[] voids = RPC.call(ping, new Object[][]{{},{}}, - new InetSocketAddress[] {addr, addr}, conf); - assertEquals(voids, null); } finally { server.stop(); if(proxy!=null) RPC.stopProxy(proxy);