HDFS-13749. [SBN read] Use getServiceStatus to discover observer namenodes. Contributed by Chao Sun.
This commit is contained in:
parent
0de06c010a
commit
556b7a840b
|
@ -25,12 +25,16 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
|
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
|
||||||
import org.apache.hadoop.ipc.AlignmentContext;
|
import org.apache.hadoop.ipc.AlignmentContext;
|
||||||
|
import org.apache.hadoop.ipc.Client;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -62,8 +66,9 @@ import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create proxy objects with {@link ClientProtocol} to communicate with a remote
|
* Create proxy objects with {@link ClientProtocol} and
|
||||||
* NN. Generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol(
|
* {@link HAServiceProtocol} to communicate with a remote NN. For the former,
|
||||||
|
* generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol(
|
||||||
* Configuration, URI, AtomicBoolean)}, which will create either an HA- or
|
* Configuration, URI, AtomicBoolean)}, which will create either an HA- or
|
||||||
* non-HA-enabled client proxy as appropriate.
|
* non-HA-enabled client proxy as appropriate.
|
||||||
*
|
*
|
||||||
|
@ -76,6 +81,11 @@ public class NameNodeProxiesClient {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
NameNodeProxiesClient.class);
|
NameNodeProxiesClient.class);
|
||||||
|
|
||||||
|
/** Maximum # of retries for HAProxy with HAServiceProtocol. */
|
||||||
|
private static final int MAX_RETRIES = 3;
|
||||||
|
/** Initial retry delay for HAProxy with HAServiceProtocol. */
|
||||||
|
private static final int DELAY_MILLISECONDS = 200;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wrapper for a client proxy as well as its associated service ID.
|
* Wrapper for a client proxy as well as its associated service ID.
|
||||||
* This is simply used as a tuple-like return type for created NN proxy.
|
* This is simply used as a tuple-like return type for created NN proxy.
|
||||||
|
@ -342,6 +352,34 @@ public class NameNodeProxiesClient {
|
||||||
fallbackToSimpleAuth, null);
|
fallbackToSimpleAuth, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a non-HA proxy object with {@link HAServiceProtocol} to the
|
||||||
|
* given NameNode address, using the provided configuration. The proxy will
|
||||||
|
* use the RPC timeout configuration specified via {@link
|
||||||
|
* org.apache.hadoop.fs.CommonConfigurationKeys#IPC_CLIENT_RPC_TIMEOUT_KEY}.
|
||||||
|
* Upon failures, this will retry up to certain times with {@link RetryProxy}.
|
||||||
|
*
|
||||||
|
* @param address the NameNode address
|
||||||
|
* @param conf the configuration to be used
|
||||||
|
* @return a non-HA proxy with {@link HAServiceProtocol}.
|
||||||
|
*/
|
||||||
|
public static HAServiceProtocol createNonHAProxyWithHAServiceProtocol(
|
||||||
|
InetSocketAddress address, Configuration conf) throws IOException {
|
||||||
|
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
|
||||||
|
MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
HAServiceProtocol proxy =
|
||||||
|
new HAServiceProtocolClientSideTranslatorPB(
|
||||||
|
address, conf, NetUtils.getDefaultSocketFactory(conf),
|
||||||
|
Client.getRpcTimeout(conf));
|
||||||
|
return (HAServiceProtocol) RetryProxy.create(
|
||||||
|
HAServiceProtocol.class,
|
||||||
|
new DefaultFailoverProxyProvider<>(HAServiceProtocol.class, proxy),
|
||||||
|
new HashMap<>(),
|
||||||
|
timeoutPolicy
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
public static ClientProtocol createProxyWithAlignmentContext(
|
public static ClientProtocol createProxyWithAlignmentContext(
|
||||||
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
||||||
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
|
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
|
||||||
|
|
|
@ -28,11 +28,14 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||||
|
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -119,23 +122,44 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
|
||||||
*/
|
*/
|
||||||
private HAServiceState cachedState;
|
private HAServiceState cachedState;
|
||||||
|
|
||||||
public NNProxyInfo(InetSocketAddress address) {
|
/** Proxy for getting HA service status from the given NameNode. */
|
||||||
|
private HAServiceProtocol serviceProxy;
|
||||||
|
|
||||||
|
public NNProxyInfo(InetSocketAddress address, Configuration conf) {
|
||||||
super(null, address.toString());
|
super(null, address.toString());
|
||||||
this.address = address;
|
this.address = address;
|
||||||
|
try {
|
||||||
|
serviceProxy = NameNodeProxiesClient
|
||||||
|
.createNonHAProxyWithHAServiceProtocol(address, conf);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Failed to create HAServiceProtocol proxy to NameNode" +
|
||||||
|
" at {}", address, ioe);
|
||||||
|
throw new RuntimeException(ioe);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public InetSocketAddress getAddress() {
|
public InetSocketAddress getAddress() {
|
||||||
return address;
|
return address;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setCachedState(HAServiceState state) {
|
public void refreshCachedState() {
|
||||||
cachedState = state;
|
try {
|
||||||
|
cachedState = serviceProxy.getServiceStatus().getState();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to connect to {}. Setting cached state to Standby",
|
||||||
|
address, e);
|
||||||
|
cachedState = HAServiceState.STANDBY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public HAServiceState getCachedState() {
|
public HAServiceState getCachedState() {
|
||||||
return cachedState;
|
return cachedState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setServiceProxyForTesting(HAServiceProtocol proxy) {
|
||||||
|
this.serviceProxy = proxy;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -153,8 +177,8 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
|
||||||
pi.proxy = factory.createProxy(conf,
|
pi.proxy = factory.createProxy(conf,
|
||||||
pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
|
pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("{} Failed to create RPC proxy to NameNode",
|
LOG.error("{} Failed to create RPC proxy to NameNode at {}",
|
||||||
this.getClass().getSimpleName(), ioe);
|
this.getClass().getSimpleName(), pi.address, ioe);
|
||||||
throw new RuntimeException(ioe);
|
throw new RuntimeException(ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -178,7 +202,7 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
|
||||||
|
|
||||||
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
|
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
|
||||||
for (InetSocketAddress address : addressesOfNns) {
|
for (InetSocketAddress address : addressesOfNns) {
|
||||||
proxies.add(new NNProxyInfo<T>(address));
|
proxies.add(new NNProxyInfo<T>(address, conf));
|
||||||
}
|
}
|
||||||
// Randomize the list to prevent all clients pointing to the same one
|
// Randomize the list to prevent all clients pointing to the same one
|
||||||
boolean randomized = getRandomOrder(conf, uri);
|
boolean randomized = getRandomOrder(conf, uri);
|
||||||
|
|
|
@ -48,7 +48,7 @@ public class IPFailoverProxyProvider<T> extends
|
||||||
public IPFailoverProxyProvider(Configuration conf, URI uri,
|
public IPFailoverProxyProvider(Configuration conf, URI uri,
|
||||||
Class<T> xface, HAProxyFactory<T> factory) {
|
Class<T> xface, HAProxyFactory<T> factory) {
|
||||||
super(conf, uri, xface, factory);
|
super(conf, uri, xface, factory);
|
||||||
this.nnProxyInfo = new NNProxyInfo<T>(DFSUtilClient.getNNAddress(uri));
|
this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri), conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,12 +27,10 @@ import java.net.URI;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.ClientGSIContext;
|
import org.apache.hadoop.hdfs.ClientGSIContext;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
||||||
import org.apache.hadoop.io.retry.AtMostOnce;
|
import org.apache.hadoop.io.retry.AtMostOnce;
|
||||||
import org.apache.hadoop.io.retry.Idempotent;
|
import org.apache.hadoop.io.retry.Idempotent;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
|
@ -40,8 +38,6 @@ import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||||
import org.apache.hadoop.ipc.AlignmentContext;
|
import org.apache.hadoop.ipc.AlignmentContext;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -181,49 +177,6 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
return lastProxy;
|
return lastProxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends ClientProtocol> HAServiceState getServiceState(
|
|
||||||
NNProxyInfo<T> pi) {
|
|
||||||
// TODO: should introduce new ClientProtocol method to verify the
|
|
||||||
// underlying service state, which does not require superuser access
|
|
||||||
// The is a workaround
|
|
||||||
IOException ioe = null;
|
|
||||||
try {
|
|
||||||
// Verify write access first
|
|
||||||
pi.proxy.reportBadBlocks(new LocatedBlock[0]);
|
|
||||||
return HAServiceState.ACTIVE; // Only active NameNode allows write
|
|
||||||
} catch (RemoteException re) {
|
|
||||||
IOException sbe = re.unwrapRemoteException(StandbyException.class);
|
|
||||||
if (!(sbe instanceof StandbyException)) {
|
|
||||||
ioe = re;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
ioe = e;
|
|
||||||
}
|
|
||||||
if (ioe != null) {
|
|
||||||
LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
|
|
||||||
return HAServiceState.STANDBY; // Just assume standby in this case
|
|
||||||
// Anything besides observer is fine
|
|
||||||
}
|
|
||||||
// Verify read access
|
|
||||||
// For now we assume only Observer nodes allow reads
|
|
||||||
// Stale reads on StandbyNode should be turned off
|
|
||||||
try {
|
|
||||||
pi.proxy.checkAccess("/", FsAction.READ);
|
|
||||||
return HAServiceState.OBSERVER;
|
|
||||||
} catch (RemoteException re) {
|
|
||||||
IOException sbe = re.unwrapRemoteException(StandbyException.class);
|
|
||||||
if (!(sbe instanceof StandbyException)) {
|
|
||||||
ioe = re;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
ioe = e;
|
|
||||||
}
|
|
||||||
if (ioe != null) {
|
|
||||||
LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
|
|
||||||
}
|
|
||||||
return HAServiceState.STANDBY;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the currently used proxy. If there is none, first calls
|
* Return the currently used proxy. If there is none, first calls
|
||||||
* {@link #changeProxy(NNProxyInfo)} to initialize one.
|
* {@link #changeProxy(NNProxyInfo)} to initialize one.
|
||||||
|
@ -254,7 +207,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
currentProxy = null;
|
currentProxy = null;
|
||||||
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
|
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
|
||||||
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
|
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
|
||||||
currentProxy.setCachedState(getServiceState(currentProxy));
|
currentProxy.refreshCachedState();
|
||||||
LOG.debug("Changed current proxy from {} to {}",
|
LOG.debug("Changed current proxy from {} to {}",
|
||||||
initial == null ? "none" : initial.proxyInfo,
|
initial == null ? "none" : initial.proxyInfo,
|
||||||
currentProxy.proxyInfo);
|
currentProxy.proxyInfo);
|
||||||
|
|
|
@ -22,10 +22,13 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.ha.HAServiceStatus;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -38,10 +41,12 @@ import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for {@link ObserverReadProxyProvider} under various configurations of
|
* Tests for {@link ObserverReadProxyProvider} under various configurations of
|
||||||
|
@ -56,7 +61,7 @@ public class TestObserverReadProxyProvider {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
|
private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
|
||||||
private ClientProtocolAnswer[] namenodeAnswers;
|
private NameNodeAnswer[] namenodeAnswers;
|
||||||
private String[] namenodeAddrs;
|
private String[] namenodeAddrs;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -70,32 +75,53 @@ public class TestObserverReadProxyProvider {
|
||||||
private void setupProxyProvider(int namenodeCount) throws Exception {
|
private void setupProxyProvider(int namenodeCount) throws Exception {
|
||||||
String[] namenodeIDs = new String[namenodeCount];
|
String[] namenodeIDs = new String[namenodeCount];
|
||||||
namenodeAddrs = new String[namenodeCount];
|
namenodeAddrs = new String[namenodeCount];
|
||||||
namenodeAnswers = new ClientProtocolAnswer[namenodeCount];
|
namenodeAnswers = new NameNodeAnswer[namenodeCount];
|
||||||
ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
|
ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
|
||||||
Map<String, ClientProtocol> proxyMap = new HashMap<>();
|
Map<String, ClientProtocol> proxyMap = new HashMap<>();
|
||||||
|
HAServiceProtocol[] serviceProxies = new HAServiceProtocol[namenodeCount];
|
||||||
|
Map<String, HAServiceProtocol> serviceProxyMap = new HashMap<>();
|
||||||
for (int i = 0; i < namenodeCount; i++) {
|
for (int i = 0; i < namenodeCount; i++) {
|
||||||
namenodeIDs[i] = "nn" + i;
|
namenodeIDs[i] = "nn" + i;
|
||||||
namenodeAddrs[i] = "namenode" + i + ".test:8020";
|
namenodeAddrs[i] = "namenode" + i + ".test:8020";
|
||||||
conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns +
|
conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns +
|
||||||
"." + namenodeIDs[i], namenodeAddrs[i]);
|
"." + namenodeIDs[i], namenodeAddrs[i]);
|
||||||
namenodeAnswers[i] = new ClientProtocolAnswer();
|
namenodeAnswers[i] = new NameNodeAnswer();
|
||||||
proxies[i] = mock(ClientProtocol.class);
|
proxies[i] = mock(ClientProtocol.class);
|
||||||
doWrite(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
|
doWrite(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
|
||||||
doRead(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
|
.when(proxies[i]));
|
||||||
|
doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
|
||||||
|
.when(proxies[i]));
|
||||||
|
serviceProxies[i] = mock(HAServiceProtocol.class);
|
||||||
|
Mockito.doAnswer(namenodeAnswers[i].serviceAnswer)
|
||||||
|
.when(serviceProxies[i]).getServiceStatus();
|
||||||
proxyMap.put(namenodeAddrs[i], proxies[i]);
|
proxyMap.put(namenodeAddrs[i], proxies[i]);
|
||||||
|
serviceProxyMap.put(namenodeAddrs[i], serviceProxies[i]);
|
||||||
}
|
}
|
||||||
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
|
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
|
||||||
Joiner.on(",").join(namenodeIDs));
|
Joiner.on(",").join(namenodeIDs));
|
||||||
proxyProvider = new ObserverReadProxyProvider<>(conf, nnURI,
|
proxyProvider = new ObserverReadProxyProvider<ClientProtocol>(conf, nnURI,
|
||||||
ClientProtocol.class, new ClientHAProxyFactory<ClientProtocol>() {
|
ClientProtocol.class,
|
||||||
|
new ClientHAProxyFactory<ClientProtocol>() {
|
||||||
|
@Override
|
||||||
|
public ClientProtocol createProxy(Configuration config,
|
||||||
|
InetSocketAddress nnAddr, Class<ClientProtocol> xface,
|
||||||
|
UserGroupInformation ugi, boolean withRetries,
|
||||||
|
AtomicBoolean fallbackToSimpleAuth) {
|
||||||
|
return proxyMap.get(nnAddr.toString());
|
||||||
|
}
|
||||||
|
}) {
|
||||||
@Override
|
@Override
|
||||||
public ClientProtocol createProxy(Configuration conf,
|
protected List<NNProxyInfo<ClientProtocol>> getProxyAddresses(
|
||||||
InetSocketAddress nnAddr, Class<ClientProtocol> xface,
|
URI uri, String addressKey) {
|
||||||
UserGroupInformation ugi, boolean withRetries,
|
List<NNProxyInfo<ClientProtocol>> nnProxies =
|
||||||
AtomicBoolean fallbackToSimpleAuth) {
|
super.getProxyAddresses(uri, addressKey);
|
||||||
return proxyMap.get(nnAddr.toString());
|
for (NNProxyInfo<ClientProtocol> nnProxy : nnProxies) {
|
||||||
|
String addressStr = nnProxy.getAddress().toString();
|
||||||
|
nnProxy.setServiceProxyForTesting(serviceProxyMap.get(addressStr));
|
||||||
|
}
|
||||||
|
return nnProxies;
|
||||||
}
|
}
|
||||||
});
|
};
|
||||||
proxyProvider.setObserverReadEnabled(true);
|
proxyProvider.setObserverReadEnabled(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,39 +301,62 @@ public class TestObserverReadProxyProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An {@link Answer} used for mocking of a {@link ClientProtocol}. Setting
|
* An {@link Answer} used for mocking of {@link ClientProtocol} and
|
||||||
* the state or unreachability of this Answer will make the linked
|
* {@link HAServiceProtocol}. Setting the state or unreachability of this
|
||||||
* ClientProtocol respond as if it was communicating with a NameNode of
|
* Answer will make the linked ClientProtocol respond as if it was
|
||||||
* the corresponding state. It is in Standby state by default.
|
* communicating with a NameNode of the corresponding state. It is in Standby
|
||||||
|
* state by default.
|
||||||
*/
|
*/
|
||||||
private static class ClientProtocolAnswer implements Answer<Void> {
|
private static class NameNodeAnswer {
|
||||||
|
|
||||||
private volatile boolean unreachable = false;
|
private volatile boolean unreachable = false;
|
||||||
// Standby state by default
|
// Standby state by default
|
||||||
private volatile boolean allowWrites = false;
|
private volatile boolean allowWrites = false;
|
||||||
private volatile boolean allowReads = false;
|
private volatile boolean allowReads = false;
|
||||||
|
|
||||||
@Override
|
private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer();
|
||||||
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
private HAServiceProtocolAnswer serviceAnswer =
|
||||||
if (unreachable) {
|
new HAServiceProtocolAnswer();
|
||||||
throw new IOException("Unavailable");
|
|
||||||
|
private class HAServiceProtocolAnswer implements Answer<HAServiceStatus> {
|
||||||
|
@Override
|
||||||
|
public HAServiceStatus answer(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
HAServiceStatus status = mock(HAServiceStatus.class);
|
||||||
|
if (allowReads && allowWrites) {
|
||||||
|
when(status.getState()).thenReturn(HAServiceState.ACTIVE);
|
||||||
|
} else if (allowReads) {
|
||||||
|
when(status.getState()).thenReturn(HAServiceState.OBSERVER);
|
||||||
|
} else {
|
||||||
|
when(status.getState()).thenReturn(HAServiceState.STANDBY);
|
||||||
|
}
|
||||||
|
return status;
|
||||||
}
|
}
|
||||||
switch (invocationOnMock.getMethod().getName()) {
|
}
|
||||||
|
|
||||||
|
private class ClientProtocolAnswer implements Answer<Void> {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
if (unreachable) {
|
||||||
|
throw new IOException("Unavailable");
|
||||||
|
}
|
||||||
|
switch (invocationOnMock.getMethod().getName()) {
|
||||||
case "reportBadBlocks":
|
case "reportBadBlocks":
|
||||||
if (!allowWrites) {
|
if (!allowWrites) {
|
||||||
throw new RemoteException(StandbyException.class.getCanonicalName(),
|
throw new RemoteException(
|
||||||
"No writes!");
|
StandbyException.class.getCanonicalName(), "No writes!");
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
case "checkAccess":
|
case "checkAccess":
|
||||||
if (!allowReads) {
|
if (!allowReads) {
|
||||||
throw new RemoteException(StandbyException.class.getCanonicalName(),
|
throw new RemoteException(
|
||||||
"No reads!");
|
StandbyException.class.getCanonicalName(), "No reads!");
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Only reportBadBlocks and checkAccess supported!");
|
"Only reportBadBlocks and checkAccess supported!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue