HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol. Contributed by Uma Maheswara Rao G.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1232284 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-01-17 03:10:25 +00:00
parent d880c7cc78
commit 212678f036
9 changed files with 191 additions and 118 deletions

View File

@ -27,28 +27,28 @@ import org.apache.hadoop.ipc.RPC;
* event of failover, and always returns the same proxy object. * event of failover, and always returns the same proxy object.
*/ */
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class DefaultFailoverProxyProvider implements FailoverProxyProvider { public class DefaultFailoverProxyProvider<T> implements FailoverProxyProvider<T> {
private Object proxy; private T proxy;
private Class<?> iface; private Class<T> iface;
public DefaultFailoverProxyProvider(Class<?> iface, Object proxy) { public DefaultFailoverProxyProvider(Class<T> iface, T proxy) {
this.proxy = proxy; this.proxy = proxy;
this.iface = iface; this.iface = iface;
} }
@Override @Override
public Class<?> getInterface() { public Class<T> getInterface() {
return iface; return iface;
} }
@Override @Override
public Object getProxy() { public T getProxy() {
return proxy; return proxy;
} }
@Override @Override
public void performFailover(Object currentProxy) { public void performFailover(T currentProxy) {
// Nothing to do. // Nothing to do.
} }

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
* {@link RetryPolicy}. * {@link RetryPolicy}.
*/ */
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface FailoverProxyProvider extends Closeable { public interface FailoverProxyProvider<T> extends Closeable {
/** /**
* Get the proxy object which should be used until the next failover event * Get the proxy object which should be used until the next failover event
@ -37,7 +37,7 @@ public interface FailoverProxyProvider extends Closeable {
* *
* @return the proxy object to invoke methods upon * @return the proxy object to invoke methods upon
*/ */
public Object getProxy(); public T getProxy();
/** /**
* Called whenever the associated {@link RetryPolicy} determines that an error * Called whenever the associated {@link RetryPolicy} determines that an error
@ -46,7 +46,7 @@ public interface FailoverProxyProvider extends Closeable {
* @param currentProxy the proxy object which was being used before this * @param currentProxy the proxy object which was being used before this
* failover event * failover event
*/ */
public void performFailover(Object currentProxy); public void performFailover(T currentProxy);
/** /**
* Return a reference to the interface this provider's proxy objects actually * Return a reference to the interface this provider's proxy objects actually
@ -58,5 +58,5 @@ public interface FailoverProxyProvider extends Closeable {
* @return the interface implemented by the proxy objects returned by * @return the interface implemented by the proxy objects returned by
* {@link FailoverProxyProvider#getProxy()} * {@link FailoverProxyProvider#getProxy()}
*/ */
public Class<?> getInterface(); public Class<T> getInterface();
} }

View File

@ -28,19 +28,20 @@ import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.util.ThreadUtil; import org.apache.hadoop.util.ThreadUtil;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings("unchecked")
public class TestFailoverProxy { public class TestFailoverProxy {
public static class FlipFlopProxyProvider implements FailoverProxyProvider { public static class FlipFlopProxyProvider<T> implements FailoverProxyProvider<T> {
private Class<?> iface; private Class<T> iface;
private Object currentlyActive; private T currentlyActive;
private Object impl1; private T impl1;
private Object impl2; private T impl2;
private int failoversOccurred = 0; private int failoversOccurred = 0;
public FlipFlopProxyProvider(Class<?> iface, Object activeImpl, public FlipFlopProxyProvider(Class<T> iface, T activeImpl,
Object standbyImpl) { T standbyImpl) {
this.iface = iface; this.iface = iface;
this.impl1 = activeImpl; this.impl1 = activeImpl;
this.impl2 = standbyImpl; this.impl2 = standbyImpl;
@ -48,7 +49,7 @@ public class TestFailoverProxy {
} }
@Override @Override
public Object getProxy() { public T getProxy() {
return currentlyActive; return currentlyActive;
} }
@ -59,7 +60,7 @@ public class TestFailoverProxy {
} }
@Override @Override
public Class<?> getInterface() { public Class<T> getInterface() {
return iface; return iface;
} }

View File

@ -109,3 +109,5 @@ HDFS-2789. TestHAAdmin.testFailover is failing (eli)
HDFS-2747. Entering safe mode after starting SBN can NPE. (Uma Maheswara Rao G via todd) HDFS-2747. Entering safe mode after starting SBN can NPE. (Uma Maheswara Rao G via todd)
HDFS-2772. On transition to active, standby should not swallow ELIE. (atm) HDFS-2772. On transition to active, standby should not swallow ELIE. (atm)
HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol. (Uma Maheswara Rao G via todd)

View File

@ -94,9 +94,6 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -109,7 +106,6 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
/******************************************************** /********************************************************
* DFSClient can connect to a Hadoop Filesystem and * DFSClient can connect to a Hadoop Filesystem and
@ -312,20 +308,10 @@ public class DFSClient implements java.io.Closeable {
this.clientName = leaserenewer.getClientName(dfsClientConf.taskId); this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
ClientProtocol failoverNNProxy = (ClientProtocol) HAUtil
Class<?> failoverProxyProviderClass = getFailoverProxyProviderClass( .createFailoverProxy(conf, nameNodeUri, ClientProtocol.class);
nameNodeUri, conf); if (nameNodeUri != null && failoverNNProxy != null) {
this.namenode = failoverNNProxy;
if (nameNodeUri != null && failoverProxyProviderClass != null) {
FailoverProxyProvider failoverProxyProvider = (FailoverProxyProvider)
ReflectionUtils.newInstance(failoverProxyProviderClass, conf);
this.namenode = (ClientProtocol)RetryProxy.create(ClientProtocol.class,
failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL,
dfsClientConf.maxFailoverAttempts,
dfsClientConf.failoverSleepBaseMillis,
dfsClientConf.failoverSleepMaxMillis));
nnAddress = null; nnAddress = null;
} else if (nameNodeUri != null && rpcNamenode == null) { } else if (nameNodeUri != null && rpcNamenode == null) {
this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri), conf); this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri), conf);
@ -353,39 +339,6 @@ public class DFSClient implements java.io.Closeable {
LOG.debug("Short circuit read is " + shortCircuitLocalReads); LOG.debug("Short circuit read is " + shortCircuitLocalReads);
} }
} }
private Class<?> getFailoverProxyProviderClass(URI nameNodeUri, Configuration conf)
throws IOException {
if (nameNodeUri == null) {
return null;
}
String host = nameNodeUri.getHost();
String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + host;
try {
Class<?> ret = conf.getClass(configKey, null);
if (ret != null) {
// If we found a proxy provider, then this URI should be a logical NN.
// Given that, it shouldn't have a non-default port number.
int port = nameNodeUri.getPort();
if (port > 0 && port != NameNode.DEFAULT_PORT) {
throw new IOException(
"Port " + port + " specified in URI " + nameNodeUri +
" but host '" + host + "' is a logical (HA) namenode" +
" and does not use port information.");
}
}
return ret;
} catch (RuntimeException e) {
if (e.getCause() instanceof ClassNotFoundException) {
throw new IOException("Could not load failover proxy provider class "
+ conf.get(configKey) + " which is configured for authority " + nameNodeUri,
e);
} else {
throw e;
}
}
}
/** /**
* Return the number of times the client should go back to the namenode * Return the number of times the client should go back to the namenode

View File

@ -28,10 +28,12 @@ import java.security.SecureRandom;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory; import javax.net.SocketFactory;
@ -47,7 +49,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
@ -809,6 +816,32 @@ public class DFSUtil {
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory); return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
} }
/**
* Build a NamenodeProtocol connection to the namenode and set up the retry
* policy
*/
public static NamenodeProtocolTranslatorPB createNNProxyWithNamenodeProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException {
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
= new HashMap<Class<? extends Exception>, RetryPolicy>();
RetryPolicy methodPolicy = RetryPolicies.retryByException(timeoutPolicy,
exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("getBlocks", methodPolicy);
methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
ProtobufRpcEngine.class);
NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class, RPC
.getProtocolVersion(NamenodeProtocolPB.class), address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf));
NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
return new NamenodeProtocolTranslatorPB(retryProxy);
}
/** /**
* Get nameservice Id for the {@link NameNode} based on namenode RPC address * Get nameservice Id for the {@link NameNode} based on namenode RPC address
* matching the local node address. * matching the local node address.

View File

@ -18,13 +18,23 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection; import java.net.URI;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.base.Preconditions;
public class HAUtil { public class HAUtil {
private HAUtil() { /* Hidden constructor */ } private HAUtil() { /* Hidden constructor */ }
@ -110,5 +120,84 @@ public class HAUtil {
public static void setAllowStandbyReads(Configuration conf, boolean val) { public static void setAllowStandbyReads(Configuration conf, boolean val) {
conf.setBoolean("dfs.ha.allow.stale.reads", val); conf.setBoolean("dfs.ha.allow.stale.reads", val);
} }
/** Creates the Failover proxy provider instance*/
@SuppressWarnings("unchecked")
public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, Class<FailoverProxyProvider<?>> failoverProxyProviderClass,
Class xface) throws IOException {
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface %s is not a NameNode protocol", xface);
try {
Constructor<FailoverProxyProvider<?>> ctor = failoverProxyProviderClass
.getConstructor(Class.class);
FailoverProxyProvider<?> provider = ctor.newInstance(xface);
ReflectionUtils.setConf(provider, conf);
return (FailoverProxyProvider<T>) provider;
} catch (Exception e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new IOException(
"Couldn't create proxy provider " + failoverProxyProviderClass, e);
}
}
}
/** Gets the configured Failover proxy provider's class */
public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
if (nameNodeUri == null) {
return null;
}
String host = nameNodeUri.getHost();
String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "."
+ host;
try {
@SuppressWarnings("unchecked")
Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf
.getClass(configKey, null, FailoverProxyProvider.class);
if (ret != null) {
// If we found a proxy provider, then this URI should be a logical NN.
// Given that, it shouldn't have a non-default port number.
int port = nameNodeUri.getPort();
if (port > 0 && port != NameNode.DEFAULT_PORT) {
throw new IOException("Port " + port + " specified in URI "
+ nameNodeUri + " but host '" + host
+ "' is a logical (HA) namenode"
+ " and does not use port information.");
}
}
return ret;
} catch (RuntimeException e) {
if (e.getCause() instanceof ClassNotFoundException) {
throw new IOException("Could not load failover proxy provider class "
+ conf.get(configKey) + " which is configured for authority "
+ nameNodeUri, e);
} else {
throw e;
}
}
}
/** Creates the namenode proxy with the passed Protocol */
@SuppressWarnings("unchecked")
public static Object createFailoverProxy(Configuration conf, URI nameNodeUri,
Class xface) throws IOException {
Class<FailoverProxyProvider<?>> failoverProxyProviderClass = HAUtil
.getFailoverProxyProviderClass(conf, nameNodeUri, xface);
if (failoverProxyProviderClass != null) {
FailoverProxyProvider<?> failoverProxyProvider = HAUtil
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface);
Conf config = new Conf(conf);
return RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
config.maxFailoverAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis));
}
return null;
}
} }

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
@ -58,7 +57,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
/** /**
@ -88,7 +86,8 @@ class NameNodeConnector {
InetSocketAddress nn = Lists.newArrayList(haNNs).get(0); InetSocketAddress nn = Lists.newArrayList(haNNs).get(0);
// TODO(HA): need to deal with connecting to HA NN pair here // TODO(HA): need to deal with connecting to HA NN pair here
this.namenodeAddress = nn; this.namenodeAddress = nn;
this.namenode = createNamenode(nn, conf); this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(nn, conf,
UserGroupInformation.getCurrentUser());
this.client = DFSUtil.createNamenode(conf); this.client = DFSUtil.createNamenode(conf);
this.fs = FileSystem.get(NameNode.getUri(nn), conf); this.fs = FileSystem.get(NameNode.getUri(nn), conf);
@ -196,33 +195,6 @@ class NameNodeConnector {
+ "]"; + "]";
} }
/** Build a NamenodeProtocol connection to the namenode and
* set up the retry policy
*/
private static NamenodeProtocol createNamenode(InetSocketAddress address,
Configuration conf) throws IOException {
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
5, 200, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
RetryPolicy methodPolicy = RetryPolicies.retryByException(
timeoutPolicy, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap =
new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("getBlocks", methodPolicy);
methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
ProtobufRpcEngine.class);
NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class,
RPC.getProtocolVersion(NamenodeProtocolPB.class), address,
UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf));
NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
return new NamenodeProtocolTranslatorPB(retryProxy);
}
/** /**
* Periodically updates access keys. * Periodically updates access keys.
*/ */

View File

@ -32,52 +32,75 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Preconditions;
/** /**
* A FailoverProxyProvider implementation which allows one to configure two URIs * A FailoverProxyProvider implementation which allows one to configure two URIs
* to connect to during fail-over. The first configured address is tried first, * to connect to during fail-over. The first configured address is tried first,
* and on a fail-over event the other address is tried. * and on a fail-over event the other address is tried.
*/ */
public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider, public class ConfiguredFailoverProxyProvider<T> implements
Configurable { FailoverProxyProvider<T>, Configurable {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(ConfiguredFailoverProxyProvider.class); LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
private Configuration conf; private Configuration conf;
private int currentProxyIndex = 0; private int currentProxyIndex = 0;
private List<AddressRpcProxyPair> proxies = new ArrayList<AddressRpcProxyPair>(); private List<AddressRpcProxyPair<T>> proxies = new ArrayList<AddressRpcProxyPair<T>>();
private UserGroupInformation ugi; private UserGroupInformation ugi;
private final Class<T> xface;
public ConfiguredFailoverProxyProvider(Class<T> xface) {
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface class %s is not a valid NameNode protocol!");
this.xface = xface;
}
@Override @Override
public Class<?> getInterface() { public Class<T> getInterface() {
return ClientProtocol.class; return xface;
} }
/** /**
* Lazily initialize the RPC proxy object. * Lazily initialize the RPC proxy object.
*/ */
@SuppressWarnings("unchecked")
@Override @Override
public synchronized Object getProxy() { public synchronized T getProxy() {
AddressRpcProxyPair current = proxies.get(currentProxyIndex); AddressRpcProxyPair current = proxies.get(currentProxyIndex);
if (current.namenode == null) { if (current.namenode == null) {
try { try {
// TODO(HA): This will create a NN proxy with an underlying retry if (NamenodeProtocol.class.equals(xface)) {
// proxy. We don't want this. current.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(
current.namenode = DFSUtil.createNamenode(current.address, conf, ugi); current.address, conf, ugi);
} else if (ClientProtocol.class.equals(xface)) {
// TODO(HA): This will create a NN proxy with an underlying retry
// proxy. We don't want this.
current.namenode = DFSUtil.createNamenode(current.address, conf, ugi);
} else {
throw new IllegalStateException(
"Upsupported protocol found when creating the proxy conection to NameNode. "
+ ((xface != null) ? xface.getClass().getName() : xface)
+ " is not supported by " + this.getClass().getName());
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e); LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
return current.namenode; return (T)current.namenode;
} }
@Override @Override
public synchronized void performFailover(Object currentProxy) { public synchronized void performFailover(T currentProxy) {
currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
} }
@ -113,7 +136,7 @@ public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
Map<String, InetSocketAddress> addressesInNN = map.get(nsId); Map<String, InetSocketAddress> addressesInNN = map.get(nsId);
for (InetSocketAddress address : addressesInNN.values()) { for (InetSocketAddress address : addressesInNN.values()) {
proxies.add(new AddressRpcProxyPair(address)); proxies.add(new AddressRpcProxyPair<T>(address));
} }
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -124,9 +147,9 @@ public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
* A little pair object to store the address and connected RPC proxy object to * A little pair object to store the address and connected RPC proxy object to
* an NN. Note that {@link AddressRpcProxyPair#namenode} may be null. * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
*/ */
private static class AddressRpcProxyPair { private static class AddressRpcProxyPair<T> {
public InetSocketAddress address; public InetSocketAddress address;
public ClientProtocol namenode; public T namenode;
public AddressRpcProxyPair(InetSocketAddress address) { public AddressRpcProxyPair(InetSocketAddress address) {
this.address = address; this.address = address;
@ -139,7 +162,7 @@ public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
*/ */
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
for (AddressRpcProxyPair proxy : proxies) { for (AddressRpcProxyPair<T> proxy : proxies) {
if (proxy.namenode != null) { if (proxy.namenode != null) {
if (proxy.namenode instanceof Closeable) { if (proxy.namenode instanceof Closeable) {
((Closeable)proxy.namenode).close(); ((Closeable)proxy.namenode).close();