diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java index 812a46e02bd..ae37d0bed4a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java @@ -27,28 +27,28 @@ * event of failover, and always returns the same proxy object. */ @InterfaceStability.Evolving -public class DefaultFailoverProxyProvider implements FailoverProxyProvider { +public class DefaultFailoverProxyProvider implements FailoverProxyProvider { - private Object proxy; - private Class iface; + private T proxy; + private Class iface; - public DefaultFailoverProxyProvider(Class iface, Object proxy) { + public DefaultFailoverProxyProvider(Class iface, T proxy) { this.proxy = proxy; this.iface = iface; } @Override - public Class getInterface() { + public Class getInterface() { return iface; } @Override - public Object getProxy() { + public T getProxy() { return proxy; } @Override - public void performFailover(Object currentProxy) { + public void performFailover(T currentProxy) { // Nothing to do. } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java index 707a40d8888..ba7d29f0d52 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java @@ -29,7 +29,7 @@ * {@link RetryPolicy}. */ @InterfaceStability.Evolving -public interface FailoverProxyProvider extends Closeable { +public interface FailoverProxyProvider extends Closeable { /** * Get the proxy object which should be used until the next failover event @@ -37,7 +37,7 @@ public interface FailoverProxyProvider extends Closeable { * * @return the proxy object to invoke methods upon */ - public Object getProxy(); + public T getProxy(); /** * Called whenever the associated {@link RetryPolicy} determines that an error @@ -46,7 +46,7 @@ public interface FailoverProxyProvider extends Closeable { * @param currentProxy the proxy object which was being used before this * failover event */ - public void performFailover(Object currentProxy); + public void performFailover(T currentProxy); /** * Return a reference to the interface this provider's proxy objects actually @@ -58,5 +58,5 @@ public interface FailoverProxyProvider extends Closeable { * @return the interface implemented by the proxy objects returned by * {@link FailoverProxyProvider#getProxy()} */ - public Class getInterface(); + public Class getInterface(); } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java index 0a2963f7be5..2a6dc2622fd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java @@ -28,19 +28,20 @@ import org.apache.hadoop.util.ThreadUtil; import org.junit.Test; +@SuppressWarnings("unchecked") public class TestFailoverProxy { - public static class FlipFlopProxyProvider implements FailoverProxyProvider { + public static class FlipFlopProxyProvider implements FailoverProxyProvider { - private Class iface; - private Object currentlyActive; - private Object impl1; - private Object impl2; + private Class iface; + private T currentlyActive; + private T impl1; + private T impl2; private int failoversOccurred = 0; - public FlipFlopProxyProvider(Class iface, Object activeImpl, - Object standbyImpl) { + public FlipFlopProxyProvider(Class iface, T activeImpl, + T standbyImpl) { this.iface = iface; this.impl1 = activeImpl; this.impl2 = standbyImpl; @@ -48,7 +49,7 @@ public FlipFlopProxyProvider(Class iface, Object activeImpl, } @Override - public Object getProxy() { + public T getProxy() { return currentlyActive; } @@ -59,7 +60,7 @@ public synchronized void performFailover(Object currentProxy) { } @Override - public Class getInterface() { + public Class getInterface() { return iface; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index e694bed695d..e5f5ede8d90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -109,3 +109,5 @@ HDFS-2789. TestHAAdmin.testFailover is failing (eli) HDFS-2747. Entering safe mode after starting SBN can NPE. (Uma Maheswara Rao G via todd) HDFS-2772. On transition to active, standby should not swallow ELIE. (atm) + +HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol. (Uma Maheswara Rao G via todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 46fa863f3b1..71014118a23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -94,9 +94,6 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.retry.FailoverProxyProvider; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -109,7 +106,6 @@ import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.ReflectionUtils; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -312,20 +308,10 @@ public DFSClient(URI nameNodeUri, Configuration conf, this.clientName = leaserenewer.getClientName(dfsClientConf.taskId); this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); - - Class failoverProxyProviderClass = getFailoverProxyProviderClass( - nameNodeUri, conf); - - if (nameNodeUri != null && failoverProxyProviderClass != null) { - FailoverProxyProvider failoverProxyProvider = (FailoverProxyProvider) - ReflectionUtils.newInstance(failoverProxyProviderClass, conf); - this.namenode = (ClientProtocol)RetryProxy.create(ClientProtocol.class, - failoverProxyProvider, - RetryPolicies.failoverOnNetworkException( - RetryPolicies.TRY_ONCE_THEN_FAIL, - dfsClientConf.maxFailoverAttempts, - dfsClientConf.failoverSleepBaseMillis, - dfsClientConf.failoverSleepMaxMillis)); + ClientProtocol failoverNNProxy = (ClientProtocol) HAUtil + .createFailoverProxy(conf, nameNodeUri, ClientProtocol.class); + if (nameNodeUri != null && failoverNNProxy != null) { + this.namenode = failoverNNProxy; nnAddress = null; } else if (nameNodeUri != null && rpcNamenode == null) { this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri), conf); @@ -353,39 +339,6 @@ public DFSClient(URI nameNodeUri, Configuration conf, LOG.debug("Short circuit read is " + shortCircuitLocalReads); } } - - private Class getFailoverProxyProviderClass(URI nameNodeUri, Configuration conf) - throws IOException { - if (nameNodeUri == null) { - return null; - } - String host = nameNodeUri.getHost(); - - String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + host; - try { - Class ret = conf.getClass(configKey, null); - if (ret != null) { - // If we found a proxy provider, then this URI should be a logical NN. - // Given that, it shouldn't have a non-default port number. - int port = nameNodeUri.getPort(); - if (port > 0 && port != NameNode.DEFAULT_PORT) { - throw new IOException( - "Port " + port + " specified in URI " + nameNodeUri + - " but host '" + host + "' is a logical (HA) namenode" + - " and does not use port information."); - } - } - return ret; - } catch (RuntimeException e) { - if (e.getCause() instanceof ClassNotFoundException) { - throw new IOException("Could not load failover proxy provider class " - + conf.get(configKey) + " which is configured for authority " + nameNodeUri, - e); - } else { - throw e; - } - } - } /** * Return the number of times the client should go back to the namenode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 29cb3b3339f..d5dc5b30c54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -28,10 +28,12 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.StringTokenizer; +import java.util.concurrent.TimeUnit; import javax.net.SocketFactory; @@ -47,7 +49,12 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; @@ -809,6 +816,32 @@ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory); } + /** + * Build a NamenodeProtocol connection to the namenode and set up the retry + * policy + */ + public static NamenodeProtocolTranslatorPB createNNProxyWithNamenodeProtocol( + InetSocketAddress address, Configuration conf, UserGroupInformation ugi) + throws IOException { + RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200, + TimeUnit.MILLISECONDS); + Map, RetryPolicy> exceptionToPolicyMap + = new HashMap, RetryPolicy>(); + RetryPolicy methodPolicy = RetryPolicies.retryByException(timeoutPolicy, + exceptionToPolicyMap); + Map methodNameToPolicyMap = new HashMap(); + methodNameToPolicyMap.put("getBlocks", methodPolicy); + methodNameToPolicyMap.put("getAccessKeys", methodPolicy); + RPC.setProtocolEngine(conf, NamenodeProtocolPB.class, + ProtobufRpcEngine.class); + NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class, RPC + .getProtocolVersion(NamenodeProtocolPB.class), address, ugi, conf, + NetUtils.getDefaultSocketFactory(conf)); + NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create( + NamenodeProtocolPB.class, proxy, methodNameToPolicyMap); + return new NamenodeProtocolTranslatorPB(retryProxy); + } + /** * Get nameservice Id for the {@link NameNode} based on namenode RPC address * matching the local node address. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 6a619712c48..1dc2bf67581 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -18,13 +18,23 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; - +import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; -import java.util.Collection; +import java.net.URI; import java.util.Map; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSClient.Conf; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.base.Preconditions; public class HAUtil { private HAUtil() { /* Hidden constructor */ } @@ -110,5 +120,84 @@ public static boolean shouldAllowStandbyReads(Configuration conf) { public static void setAllowStandbyReads(Configuration conf, boolean val) { conf.setBoolean("dfs.ha.allow.stale.reads", val); } + + /** Creates the Failover proxy provider instance*/ + @SuppressWarnings("unchecked") + public static FailoverProxyProvider createFailoverProxyProvider( + Configuration conf, Class> failoverProxyProviderClass, + Class xface) throws IOException { + Preconditions.checkArgument( + xface.isAssignableFrom(NamenodeProtocols.class), + "Interface %s is not a NameNode protocol", xface); + try { + Constructor> ctor = failoverProxyProviderClass + .getConstructor(Class.class); + FailoverProxyProvider provider = ctor.newInstance(xface); + ReflectionUtils.setConf(provider, conf); + return (FailoverProxyProvider) provider; + } catch (Exception e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException( + "Couldn't create proxy provider " + failoverProxyProviderClass, e); + } + } + } + /** Gets the configured Failover proxy provider's class */ + public static Class> getFailoverProxyProviderClass( + Configuration conf, URI nameNodeUri, Class xface) throws IOException { + if (nameNodeUri == null) { + return null; + } + String host = nameNodeUri.getHost(); + + String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + + host; + try { + @SuppressWarnings("unchecked") + Class> ret = (Class>) conf + .getClass(configKey, null, FailoverProxyProvider.class); + if (ret != null) { + // If we found a proxy provider, then this URI should be a logical NN. + // Given that, it shouldn't have a non-default port number. + int port = nameNodeUri.getPort(); + if (port > 0 && port != NameNode.DEFAULT_PORT) { + throw new IOException("Port " + port + " specified in URI " + + nameNodeUri + " but host '" + host + + "' is a logical (HA) namenode" + + " and does not use port information."); + } + } + return ret; + } catch (RuntimeException e) { + if (e.getCause() instanceof ClassNotFoundException) { + throw new IOException("Could not load failover proxy provider class " + + conf.get(configKey) + " which is configured for authority " + + nameNodeUri, e); + } else { + throw e; + } + } + } + + /** Creates the namenode proxy with the passed Protocol */ + @SuppressWarnings("unchecked") + public static Object createFailoverProxy(Configuration conf, URI nameNodeUri, + Class xface) throws IOException { + Class> failoverProxyProviderClass = HAUtil + .getFailoverProxyProviderClass(conf, nameNodeUri, xface); + if (failoverProxyProviderClass != null) { + FailoverProxyProvider failoverProxyProvider = HAUtil + .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface); + Conf config = new Conf(conf); + return RetryProxy.create(xface, failoverProxyProvider, RetryPolicies + .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, + config.maxFailoverAttempts, config.failoverSleepBaseMillis, + config.failoverSleepMaxMillis)); + } + return null; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 227ad722b98..939105871cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -22,7 +22,6 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -58,7 +57,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; /** @@ -88,7 +86,8 @@ class NameNodeConnector { InetSocketAddress nn = Lists.newArrayList(haNNs).get(0); // TODO(HA): need to deal with connecting to HA NN pair here this.namenodeAddress = nn; - this.namenode = createNamenode(nn, conf); + this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(nn, conf, + UserGroupInformation.getCurrentUser()); this.client = DFSUtil.createNamenode(conf); this.fs = FileSystem.get(NameNode.getUri(nn), conf); @@ -196,33 +195,6 @@ public String toString() { + "]"; } - /** Build a NamenodeProtocol connection to the namenode and - * set up the retry policy - */ - private static NamenodeProtocol createNamenode(InetSocketAddress address, - Configuration conf) throws IOException { - RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry( - 5, 200, TimeUnit.MILLISECONDS); - Map,RetryPolicy> exceptionToPolicyMap = - new HashMap, RetryPolicy>(); - RetryPolicy methodPolicy = RetryPolicies.retryByException( - timeoutPolicy, exceptionToPolicyMap); - Map methodNameToPolicyMap = - new HashMap(); - methodNameToPolicyMap.put("getBlocks", methodPolicy); - methodNameToPolicyMap.put("getAccessKeys", methodPolicy); - - RPC.setProtocolEngine(conf, NamenodeProtocolPB.class, - ProtobufRpcEngine.class); - NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class, - RPC.getProtocolVersion(NamenodeProtocolPB.class), address, - UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf)); - NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create( - NamenodeProtocolPB.class, proxy, methodNameToPolicyMap); - return new NamenodeProtocolTranslatorPB(retryProxy); - } - /** * Periodically updates access keys. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 65e4655b52a..c44c1c1d74b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -32,52 +32,75 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; +import com.google.common.base.Preconditions; + /** * A FailoverProxyProvider implementation which allows one to configure two URIs * to connect to during fail-over. The first configured address is tried first, * and on a fail-over event the other address is tried. */ -public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider, - Configurable { +public class ConfiguredFailoverProxyProvider implements + FailoverProxyProvider, Configurable { private static final Log LOG = LogFactory.getLog(ConfiguredFailoverProxyProvider.class); private Configuration conf; private int currentProxyIndex = 0; - private List proxies = new ArrayList(); + private List> proxies = new ArrayList>(); private UserGroupInformation ugi; + private final Class xface; + public ConfiguredFailoverProxyProvider(Class xface) { + Preconditions.checkArgument( + xface.isAssignableFrom(NamenodeProtocols.class), + "Interface class %s is not a valid NameNode protocol!"); + this.xface = xface; + } + @Override - public Class getInterface() { - return ClientProtocol.class; + public Class getInterface() { + return xface; } /** * Lazily initialize the RPC proxy object. */ + @SuppressWarnings("unchecked") @Override - public synchronized Object getProxy() { + public synchronized T getProxy() { AddressRpcProxyPair current = proxies.get(currentProxyIndex); if (current.namenode == null) { try { - // TODO(HA): This will create a NN proxy with an underlying retry - // proxy. We don't want this. - current.namenode = DFSUtil.createNamenode(current.address, conf, ugi); + if (NamenodeProtocol.class.equals(xface)) { + current.namenode = DFSUtil.createNNProxyWithNamenodeProtocol( + current.address, conf, ugi); + } else if (ClientProtocol.class.equals(xface)) { + // TODO(HA): This will create a NN proxy with an underlying retry + // proxy. We don't want this. + current.namenode = DFSUtil.createNamenode(current.address, conf, ugi); + } else { + throw new IllegalStateException( + "Upsupported protocol found when creating the proxy conection to NameNode. " + + ((xface != null) ? xface.getClass().getName() : xface) + + " is not supported by " + this.getClass().getName()); + } } catch (IOException e) { LOG.error("Failed to create RPC proxy to NameNode", e); throw new RuntimeException(e); } } - return current.namenode; + return (T)current.namenode; } @Override - public synchronized void performFailover(Object currentProxy) { + public synchronized void performFailover(T currentProxy) { currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); } @@ -113,7 +136,7 @@ public synchronized void setConf(Configuration conf) { Map addressesInNN = map.get(nsId); for (InetSocketAddress address : addressesInNN.values()) { - proxies.add(new AddressRpcProxyPair(address)); + proxies.add(new AddressRpcProxyPair(address)); } } catch (IOException e) { throw new RuntimeException(e); @@ -124,9 +147,9 @@ public synchronized void setConf(Configuration conf) { * A little pair object to store the address and connected RPC proxy object to * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null. */ - private static class AddressRpcProxyPair { + private static class AddressRpcProxyPair { public InetSocketAddress address; - public ClientProtocol namenode; + public T namenode; public AddressRpcProxyPair(InetSocketAddress address) { this.address = address; @@ -139,7 +162,7 @@ public AddressRpcProxyPair(InetSocketAddress address) { */ @Override public synchronized void close() throws IOException { - for (AddressRpcProxyPair proxy : proxies) { + for (AddressRpcProxyPair proxy : proxies) { if (proxy.namenode != null) { if (proxy.namenode instanceof Closeable) { ((Closeable)proxy.namenode).close();