HDFS-8202. Merging change 1305704 from trunk to 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1305723 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-03-27 03:38:37 +00:00
parent 1340df3b0b
commit 2f20f8d140
4 changed files with 41 additions and 29 deletions

View File

@ -178,6 +178,9 @@ Release 0.23.3 - UNRELEASED
HADOOP-8204. TestHealthMonitor fails occasionally (todd) HADOOP-8204. TestHealthMonitor fails occasionally (todd)
HADOOP-8202. RPC stopProxy() does not close the proxy correctly.
(Hari Mankude via suresh)
BREAKDOWN OF HADOOP-7454 SUBTASKS BREAKDOWN OF HADOOP-7454 SUBTASKS
HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh) HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.ha; package org.apache.hadoop.ha;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
@ -194,7 +195,9 @@ class HealthMonitor {
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Transport-level exception trying to monitor health of " + LOG.warn("Transport-level exception trying to monitor health of " +
targetToMonitor + ": " + t.getLocalizedMessage()); targetToMonitor + ": " + t.getLocalizedMessage());
if (proxy instanceof Closeable) {
RPC.stopProxy(proxy); RPC.stopProxy(proxy);
}
proxy = null; proxy = null;
enterState(State.SERVICE_NOT_RESPONDING); enterState(State.SERVICE_NOT_RESPONDING);
Thread.sleep(sleepAfterDisconnectMillis); Thread.sleep(sleepAfterDisconnectMillis);

View File

@ -39,6 +39,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.*; import org.apache.commons.logging.*;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.io.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
@ -572,38 +573,41 @@ public class RPC {
} }
/** /**
* Stop this proxy and release its invoker's resource by getting the * Stop the proxy. Proxy must either implement {@link Closeable} or must have
* invocation handler for the given proxy object and calling * associated {@link RpcInvocationHandler}.
* {@link Closeable#close} if that invocation handler implements
* {@link Closeable}.
* *
* @param proxy the RPC proxy object to be stopped * @param proxy
* the RPC proxy object to be stopped
* @throws HadoopIllegalArgumentException
* if the proxy does not implement {@link Closeable} interface or
* does not have closeable {@link InvocationHandler}
*/ */
public static void stopProxy(Object proxy) { public static void stopProxy(Object proxy) {
if (proxy instanceof ProtocolTranslator) { if (proxy == null) {
RPC.stopProxy(((ProtocolTranslator)proxy) throw new HadoopIllegalArgumentException(
.getUnderlyingProxyObject()); "Cannot close proxy since it is null");
}
try {
if (proxy instanceof Closeable) {
((Closeable) proxy).close();
return;
} else {
InvocationHandler handler = Proxy.getInvocationHandler(proxy);
if (handler instanceof Closeable) {
((Closeable) handler).close();
return; return;
} }
InvocationHandler invocationHandler = null;
try {
invocationHandler = Proxy.getInvocationHandler(proxy);
} catch (IllegalArgumentException e) {
LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e);
} }
if (proxy != null && invocationHandler != null &&
invocationHandler instanceof Closeable) {
try {
((Closeable)invocationHandler).close();
} catch (IOException e) { } catch (IOException e) {
LOG.error("Stopping RPC invocation handler caused exception", e); LOG.error("Closing proxy or invocation handler caused exception", e);
} } catch (IllegalArgumentException e) {
} else { LOG.error("RPC.stopProxy called on non proxy.", e);
LOG.error("Could not get invocation handler " + invocationHandler +
" for proxy class " + (proxy == null ? null : proxy.getClass()) +
", or invocation handler is not closeable.");
} }
throw new HadoopIllegalArgumentException(
"Cannot close proxy - is not Closeable or "
+ "does not provide closeable invocation handler "
+ proxy.getClass());
} }
/** /**

View File

@ -33,6 +33,7 @@ import java.util.Arrays;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import org.apache.commons.logging.*; import org.apache.commons.logging.*;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.UTF8;
@ -584,9 +585,10 @@ public class TestRPC {
* Test stopping a non-registered proxy * Test stopping a non-registered proxy
* @throws Exception * @throws Exception
*/ */
@Test @Test(expected=HadoopIllegalArgumentException.class)
public void testStopNonRegisteredProxy() throws Exception { public void testStopNonRegisteredProxy() throws Exception {
RPC.stopProxy(mock(TestProtocol.class)); RPC.stopProxy(mock(TestProtocol.class));
RPC.stopProxy(null);
} }
@Test @Test