HDFS-13779. [SBN read] Implement proper failover and observer failure handling logic for for ObserverReadProxyProvider. Contributed by Erik Krogen.
This commit is contained in:
parent
f9fc01cd7f
commit
aa42fb0db7
|
@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
@ -111,6 +112,12 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
|
||||||
*/
|
*/
|
||||||
public static class NNProxyInfo<T> extends ProxyInfo<T> {
|
public static class NNProxyInfo<T> extends ProxyInfo<T> {
|
||||||
private InetSocketAddress address;
|
private InetSocketAddress address;
|
||||||
|
/**
|
||||||
|
* The currently known state of the NameNode represented by this ProxyInfo.
|
||||||
|
* This may be out of date if the NameNode has changed state since the last
|
||||||
|
* time the state was checked.
|
||||||
|
*/
|
||||||
|
private HAServiceState cachedState;
|
||||||
|
|
||||||
public NNProxyInfo(InetSocketAddress address) {
|
public NNProxyInfo(InetSocketAddress address) {
|
||||||
super(null, address.toString());
|
super(null, address.toString());
|
||||||
|
@ -120,6 +127,15 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
|
||||||
public InetSocketAddress getAddress() {
|
public InetSocketAddress getAddress() {
|
||||||
return address;
|
return address;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setCachedState(HAServiceState state) {
|
||||||
|
cachedState = state;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HAServiceState getCachedState() {
|
||||||
|
return cachedState;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -20,18 +20,24 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.InvocationHandler;
|
import java.lang.reflect.InvocationHandler;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.lang.reflect.Proxy;
|
import java.lang.reflect.Proxy;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.ClientGSIContext;
|
import org.apache.hadoop.hdfs.ClientGSIContext;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.io.retry.AtMostOnce;
|
||||||
|
import org.apache.hadoop.io.retry.Idempotent;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||||
import org.apache.hadoop.ipc.AlignmentContext;
|
import org.apache.hadoop.ipc.AlignmentContext;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -59,16 +65,18 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
ObserverReadProxyProvider.class);
|
ObserverReadProxyProvider.class);
|
||||||
|
|
||||||
/** Client-side context for syncing with the NameNode server side */
|
/** Client-side context for syncing with the NameNode server side. */
|
||||||
private AlignmentContext alignmentContext;
|
private final AlignmentContext alignmentContext;
|
||||||
|
|
||||||
private AbstractNNFailoverProxyProvider<T> failoverProxy;
|
/** The inner proxy provider used for active/standby failover. */
|
||||||
/** All NameNdoe proxies */
|
private final AbstractNNFailoverProxyProvider<T> failoverProxy;
|
||||||
private List<NNProxyInfo<T>> nameNodeProxies =
|
/** List of all NameNode proxies. */
|
||||||
new ArrayList<NNProxyInfo<T>>();
|
private final List<NNProxyInfo<T>> nameNodeProxies;
|
||||||
/** Proxies for the observer namenodes */
|
|
||||||
private final List<NNProxyInfo<T>> observerProxies =
|
/** The policy used to determine if an exception is fatal or retriable. */
|
||||||
new ArrayList<NNProxyInfo<T>>();
|
private final RetryPolicy observerRetryPolicy;
|
||||||
|
/** The combined proxy which redirects to other proxies as necessary. */
|
||||||
|
private final ProxyInfo<T> combinedProxy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether reading from observer is enabled. If this is false, all read
|
* Whether reading from observer is enabled. If this is false, all read
|
||||||
|
@ -77,12 +85,19 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
private boolean observerReadEnabled;
|
private boolean observerReadEnabled;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread-local index to record the current index in the observer list.
|
* The index into the nameNodeProxies list currently being used. Should only
|
||||||
|
* be accessed in synchronized methods.
|
||||||
*/
|
*/
|
||||||
private static final ThreadLocal<Integer> currentIndex =
|
private int currentIndex = -1;
|
||||||
ThreadLocal.withInitial(() -> 0);
|
/**
|
||||||
|
* The proxy being used currently; this will match with currentIndex above.
|
||||||
|
* This field is volatile to allow reads without synchronization; updates
|
||||||
|
* should still be performed synchronously to maintain consistency between
|
||||||
|
* currentIndex and this field.
|
||||||
|
*/
|
||||||
|
private volatile NNProxyInfo<T> currentProxy;
|
||||||
|
|
||||||
/** The last proxy that has been used. Only used for testing */
|
/** The last proxy that has been used. Only used for testing. */
|
||||||
private volatile ProxyInfo<T> lastProxy = null;
|
private volatile ProxyInfo<T> lastProxy = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -90,63 +105,53 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
* {@link ConfiguredFailoverProxyProvider} for failover.
|
* {@link ConfiguredFailoverProxyProvider} for failover.
|
||||||
*/
|
*/
|
||||||
public ObserverReadProxyProvider(
|
public ObserverReadProxyProvider(
|
||||||
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory)
|
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
|
||||||
throws IOException {
|
|
||||||
this(conf, uri, xface, factory,
|
this(conf, uri, xface, factory,
|
||||||
new ConfiguredFailoverProxyProvider<T>(conf, uri, xface,factory));
|
new ConfiguredFailoverProxyProvider<>(conf, uri, xface,factory));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public ObserverReadProxyProvider(
|
public ObserverReadProxyProvider(
|
||||||
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
|
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
|
||||||
AbstractNNFailoverProxyProvider<T> failoverProxy)
|
AbstractNNFailoverProxyProvider<T> failoverProxy) {
|
||||||
throws IOException {
|
|
||||||
super(conf, uri, xface, factory);
|
super(conf, uri, xface, factory);
|
||||||
this.failoverProxy = failoverProxy;
|
this.failoverProxy = failoverProxy;
|
||||||
this.alignmentContext = new ClientGSIContext();
|
this.alignmentContext = new ClientGSIContext();
|
||||||
((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
|
((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
|
||||||
|
|
||||||
|
// Don't bother configuring the number of retries and such on the retry
|
||||||
|
// policy since it is mainly only used for determining whether or not an
|
||||||
|
// exception is retriable or fatal
|
||||||
|
observerRetryPolicy = RetryPolicies.failoverOnNetworkException(
|
||||||
|
RetryPolicies.TRY_ONCE_THEN_FAIL, 1);
|
||||||
|
|
||||||
// Get all NameNode proxies
|
// Get all NameNode proxies
|
||||||
nameNodeProxies = getProxyAddresses(uri,
|
nameNodeProxies = getProxyAddresses(uri,
|
||||||
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
// Find out all the observer proxies
|
|
||||||
for (NNProxyInfo<T> pi : nameNodeProxies) {
|
|
||||||
createProxyIfNeeded(pi);
|
|
||||||
if (isObserverState(pi)) {
|
|
||||||
observerProxies.add(pi);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: No observers is not an error
|
// Create a wrapped proxy containing all the proxies. Since this combined
|
||||||
// Just direct all reads go to the active NameNode
|
// proxy is just redirecting to other proxies, all invocations can share it.
|
||||||
if (observerProxies.isEmpty()) {
|
|
||||||
throw new RuntimeException("Couldn't find any namenode proxy in " +
|
|
||||||
"OBSERVER state");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized AlignmentContext getAlignmentContext() {
|
|
||||||
return alignmentContext;
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
|
||||||
public synchronized ProxyInfo<T> getProxy() {
|
|
||||||
// We just create a wrapped proxy containing all the proxies
|
|
||||||
StringBuilder combinedInfo = new StringBuilder("[");
|
StringBuilder combinedInfo = new StringBuilder("[");
|
||||||
|
for (int i = 0; i < nameNodeProxies.size(); i++) {
|
||||||
for (int i = 0; i < this.observerProxies.size(); i++) {
|
|
||||||
if (i > 0) {
|
if (i > 0) {
|
||||||
combinedInfo.append(",");
|
combinedInfo.append(",");
|
||||||
}
|
}
|
||||||
combinedInfo.append(observerProxies.get(i).proxyInfo);
|
combinedInfo.append(nameNodeProxies.get(i).proxyInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
combinedInfo.append(']');
|
combinedInfo.append(']');
|
||||||
T wrappedProxy = (T) Proxy.newProxyInstance(
|
T wrappedProxy = (T) Proxy.newProxyInstance(
|
||||||
ObserverReadInvocationHandler.class.getClassLoader(),
|
ObserverReadInvocationHandler.class.getClassLoader(),
|
||||||
new Class<?>[]{xface},
|
new Class<?>[] { xface }, new ObserverReadInvocationHandler());
|
||||||
new ObserverReadInvocationHandler(observerProxies));
|
combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
|
||||||
return new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
|
}
|
||||||
|
|
||||||
|
public AlignmentContext getAlignmentContext() {
|
||||||
|
return alignmentContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProxyInfo<T> getProxy() {
|
||||||
|
return combinedProxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -159,8 +164,11 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
*
|
*
|
||||||
* @return whether the 'method' is a read-only operation.
|
* @return whether the 'method' is a read-only operation.
|
||||||
*/
|
*/
|
||||||
private boolean isRead(Method method) {
|
private static boolean isRead(Method method) {
|
||||||
return method.isAnnotationPresent(ReadOnly.class);
|
if (!method.isAnnotationPresent(ReadOnly.class)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -168,21 +176,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
this.observerReadEnabled = flag;
|
this.observerReadEnabled = flag;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* After getting exception 'ex', whether we should retry the current request
|
|
||||||
* on a different observer.
|
|
||||||
*/
|
|
||||||
private boolean shouldRetry(Exception ex) throws Exception {
|
|
||||||
// TODO: implement retry policy
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
ProxyInfo<T> getLastProxy() {
|
ProxyInfo<T> getLastProxy() {
|
||||||
return lastProxy;
|
return lastProxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isObserverState(NNProxyInfo<T> pi) {
|
private static <T extends ClientProtocol> HAServiceState getServiceState(
|
||||||
|
NNProxyInfo<T> pi) {
|
||||||
// TODO: should introduce new ClientProtocol method to verify the
|
// TODO: should introduce new ClientProtocol method to verify the
|
||||||
// underlying service state, which does not require superuser access
|
// underlying service state, which does not require superuser access
|
||||||
// The is a workaround
|
// The is a workaround
|
||||||
|
@ -190,7 +190,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
try {
|
try {
|
||||||
// Verify write access first
|
// Verify write access first
|
||||||
pi.proxy.reportBadBlocks(new LocatedBlock[0]);
|
pi.proxy.reportBadBlocks(new LocatedBlock[0]);
|
||||||
return false; // Only active NameNode allows write
|
return HAServiceState.ACTIVE; // Only active NameNode allows write
|
||||||
} catch (RemoteException re) {
|
} catch (RemoteException re) {
|
||||||
IOException sbe = re.unwrapRemoteException(StandbyException.class);
|
IOException sbe = re.unwrapRemoteException(StandbyException.class);
|
||||||
if (!(sbe instanceof StandbyException)) {
|
if (!(sbe instanceof StandbyException)) {
|
||||||
|
@ -200,15 +200,16 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
ioe = e;
|
ioe = e;
|
||||||
}
|
}
|
||||||
if (ioe != null) {
|
if (ioe != null) {
|
||||||
LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
|
LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
|
||||||
return false;
|
return HAServiceState.STANDBY; // Just assume standby in this case
|
||||||
|
// Anything besides observer is fine
|
||||||
}
|
}
|
||||||
// Verify read access
|
// Verify read access
|
||||||
// For now we assume only Observer nodes allow reads
|
// For now we assume only Observer nodes allow reads
|
||||||
// Stale reads on StandbyNode should be turned off
|
// Stale reads on StandbyNode should be turned off
|
||||||
try {
|
try {
|
||||||
pi.proxy.checkAccess("/", FsAction.READ);
|
pi.proxy.checkAccess("/", FsAction.READ);
|
||||||
return true;
|
return HAServiceState.OBSERVER;
|
||||||
} catch (RemoteException re) {
|
} catch (RemoteException re) {
|
||||||
IOException sbe = re.unwrapRemoteException(StandbyException.class);
|
IOException sbe = re.unwrapRemoteException(StandbyException.class);
|
||||||
if (!(sbe instanceof StandbyException)) {
|
if (!(sbe instanceof StandbyException)) {
|
||||||
|
@ -218,29 +219,60 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
ioe = e;
|
ioe = e;
|
||||||
}
|
}
|
||||||
if (ioe != null) {
|
if (ioe != null) {
|
||||||
LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
|
LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
|
||||||
}
|
}
|
||||||
return false;
|
return HAServiceState.STANDBY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
class ObserverReadInvocationHandler implements InvocationHandler {
|
* Return the currently used proxy. If there is none, first calls
|
||||||
final List<NNProxyInfo<T>> observerProxies;
|
* {@link #changeProxy(NNProxyInfo)} to initialize one.
|
||||||
final ProxyInfo<T> activeProxy;
|
*/
|
||||||
|
private NNProxyInfo<T> getCurrentProxy() {
|
||||||
ObserverReadInvocationHandler(List<NNProxyInfo<T>> observerProxies) {
|
if (currentProxy == null) {
|
||||||
this.observerProxies = observerProxies;
|
changeProxy(null);
|
||||||
this.activeProxy = failoverProxy.getProxy();
|
|
||||||
}
|
}
|
||||||
|
return currentProxy;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move to the next proxy in the proxy list. If the NNProxyInfo supplied by
|
||||||
|
* the caller does not match the current proxy, the call is ignored; this is
|
||||||
|
* to handle concurrent calls (to avoid changing the proxy multiple times).
|
||||||
|
* The service state of the newly selected proxy will be updated before
|
||||||
|
* returning.
|
||||||
|
*
|
||||||
|
* @param initial The expected current proxy
|
||||||
|
*/
|
||||||
|
private synchronized void changeProxy(NNProxyInfo<T> initial) {
|
||||||
|
if (currentProxy != initial) {
|
||||||
|
// Must have been a concurrent modification; ignore the move request
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Attempt to force concurrent callers of getCurrentProxy to wait for the
|
||||||
|
// new proxy; best-effort by setting currentProxy to null
|
||||||
|
currentProxy = null;
|
||||||
|
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
|
||||||
|
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
|
||||||
|
currentProxy.setCachedState(getServiceState(currentProxy));
|
||||||
|
LOG.debug("Changed current proxy from {} to {}",
|
||||||
|
initial == null ? "none" : initial.proxyInfo,
|
||||||
|
currentProxy.proxyInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An InvocationHandler to handle incoming requests. This class's invoke
|
||||||
|
* method contains the primary logic for redirecting to observers.
|
||||||
|
*
|
||||||
|
* If observer reads are enabled, attempt to send read operations to the
|
||||||
|
* current proxy. If it is not an observer, or the observer fails, adjust
|
||||||
|
* the current proxy and retry on the next one. If all proxies are tried
|
||||||
|
* without success, the request is forwarded to the active.
|
||||||
|
*
|
||||||
|
* Write requests are always forwarded to the active.
|
||||||
|
*/
|
||||||
|
private class ObserverReadInvocationHandler implements InvocationHandler {
|
||||||
|
|
||||||
/**
|
|
||||||
* Sends read operations to the observer (if enabled) specified by the
|
|
||||||
* current index, and send write operations to the active. If a observer
|
|
||||||
* fails, we increment the index and retry the next one. If all observers
|
|
||||||
* fail, the request is forwarded to the active.
|
|
||||||
*
|
|
||||||
* Write requests are always forwarded to the active.
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public Object invoke(Object proxy, final Method method, final Object[] args)
|
public Object invoke(Object proxy, final Method method, final Object[] args)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
|
@ -248,33 +280,65 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
Object retVal;
|
Object retVal;
|
||||||
|
|
||||||
if (observerReadEnabled && isRead(method)) {
|
if (observerReadEnabled && isRead(method)) {
|
||||||
// Loop through all the proxies, starting from the current index.
|
int failedObserverCount = 0;
|
||||||
for (int i = 0; i < observerProxies.size(); i++) {
|
int activeCount = 0;
|
||||||
NNProxyInfo<T> current = observerProxies.get(currentIndex.get());
|
int standbyCount = 0;
|
||||||
|
for (int i = 0; i < nameNodeProxies.size(); i++) {
|
||||||
|
NNProxyInfo<T> current = getCurrentProxy();
|
||||||
|
HAServiceState currState = current.getCachedState();
|
||||||
|
if (currState != HAServiceState.OBSERVER) {
|
||||||
|
if (currState == HAServiceState.ACTIVE) {
|
||||||
|
activeCount++;
|
||||||
|
} else if (currState == HAServiceState.STANDBY) {
|
||||||
|
standbyCount++;
|
||||||
|
}
|
||||||
|
LOG.debug("Skipping proxy {} for {} because it is in state {}",
|
||||||
|
current.proxyInfo, method.getName(), currState);
|
||||||
|
changeProxy(current);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.debug("Attempting to service {} using proxy {}",
|
||||||
|
method.getName(), current.proxyInfo);
|
||||||
try {
|
try {
|
||||||
retVal = method.invoke(current.proxy, args);
|
retVal = method.invoke(current.proxy, args);
|
||||||
lastProxy = current;
|
lastProxy = current;
|
||||||
|
LOG.debug("Invocation of {} using {} was successful",
|
||||||
|
method.getName(), current.proxyInfo);
|
||||||
return retVal;
|
return retVal;
|
||||||
} catch (Exception e) {
|
} catch (InvocationTargetException ite) {
|
||||||
if (!shouldRetry(e)) {
|
if (!(ite.getCause() instanceof Exception)) {
|
||||||
throw e;
|
throw ite.getCause();
|
||||||
|
}
|
||||||
|
Exception e = (Exception) ite.getCause();
|
||||||
|
RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0,
|
||||||
|
method.isAnnotationPresent(Idempotent.class)
|
||||||
|
|| method.isAnnotationPresent(AtMostOnce.class));
|
||||||
|
if (retryInfo.action == RetryAction.RetryDecision.FAIL) {
|
||||||
|
throw e;
|
||||||
|
} else {
|
||||||
|
failedObserverCount++;
|
||||||
|
LOG.warn(
|
||||||
|
"Invocation returned exception on [{}]; {} failure(s) so far",
|
||||||
|
current.proxyInfo, failedObserverCount, e);
|
||||||
|
changeProxy(current);
|
||||||
}
|
}
|
||||||
currentIndex.set((currentIndex.get() + 1) % observerProxies.size());
|
|
||||||
LOG.warn("Invocation returned exception on [{}]",
|
|
||||||
current.proxyInfo, e.getCause());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we get here, it means all observers have failed.
|
// If we get here, it means all observers have failed.
|
||||||
LOG.warn("All observers have failed for read request {}. " +
|
LOG.warn("{} observers have failed for read request {}; also found " +
|
||||||
"Fall back on active: {}", method.getName(), activeProxy);
|
"{} standby and {} active. Falling back to active.",
|
||||||
|
failedObserverCount, standbyCount, activeCount, method.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Either all observers have failed, or that it is a write request.
|
// Either all observers have failed, or that it is a write request.
|
||||||
// In either case, we'll forward the request to active NameNode.
|
// In either case, we'll forward the request to active NameNode.
|
||||||
|
LOG.debug("Using failoverProxy to service {}", method.getName());
|
||||||
|
ProxyInfo<T> activeProxy = failoverProxy.getProxy();
|
||||||
try {
|
try {
|
||||||
retVal = method.invoke(activeProxy.proxy, args);
|
retVal = method.invoke(activeProxy.proxy, args);
|
||||||
} catch (Exception e) {
|
} catch (InvocationTargetException e) {
|
||||||
|
// This exception will be handled by higher layers
|
||||||
throw e.getCause();
|
throw e.getCause();
|
||||||
}
|
}
|
||||||
lastProxy = activeProxy;
|
lastProxy = activeProxy;
|
||||||
|
@ -284,7 +348,6 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void close() throws IOException {
|
public synchronized void close() throws IOException {
|
||||||
failoverProxy.close();
|
|
||||||
for (ProxyInfo<T> pi : nameNodeProxies) {
|
for (ProxyInfo<T> pi : nameNodeProxies) {
|
||||||
if (pi.proxy != null) {
|
if (pi.proxy != null) {
|
||||||
if (pi.proxy instanceof Closeable) {
|
if (pi.proxy instanceof Closeable) {
|
||||||
|
@ -292,8 +355,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
||||||
} else {
|
} else {
|
||||||
RPC.stopProxy(pi.proxy);
|
RPC.stopProxy(pi.proxy);
|
||||||
}
|
}
|
||||||
|
// Set to null to avoid the failoverProxy having to re-do the close
|
||||||
|
// if it is sharing a proxy instance
|
||||||
|
pi.proxy = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
failoverProxy.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,22 +27,23 @@ import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||||
import org.apache.hadoop.io.retry.RetryInvocationHandler;
|
import org.apache.hadoop.io.retry.RetryInvocationHandler;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Proxy;
|
import java.lang.reflect.Proxy;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -200,6 +201,9 @@ public class TestObserverNode {
|
||||||
// Start the observer again - requests should go to observer
|
// Start the observer again - requests should go to observer
|
||||||
dfsCluster.restartNameNode(2);
|
dfsCluster.restartNameNode(2);
|
||||||
dfsCluster.transitionToObserver(2);
|
dfsCluster.transitionToObserver(2);
|
||||||
|
// The first request goes to the active because it has not refreshed yet;
|
||||||
|
// the second will properly go to the observer
|
||||||
|
dfs.getFileStatus(testPath);
|
||||||
dfs.getFileStatus(testPath);
|
dfs.getFileStatus(testPath);
|
||||||
assertSentTo(2);
|
assertSentTo(2);
|
||||||
}
|
}
|
||||||
|
@ -231,6 +235,9 @@ public class TestObserverNode {
|
||||||
|
|
||||||
dfsCluster.transitionToObserver(2);
|
dfsCluster.transitionToObserver(2);
|
||||||
dfs.getFileStatus(testPath);
|
dfs.getFileStatus(testPath);
|
||||||
|
// The first request goes to the active because it has not refreshed yet;
|
||||||
|
// the second will properly go to the observer
|
||||||
|
dfs.getFileStatus(testPath);
|
||||||
assertSentTo(2);
|
assertSentTo(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,6 +298,10 @@ public class TestObserverNode {
|
||||||
assertEquals(0, rc);
|
assertEquals(0, rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO this does not currently work because fetching the service state from
|
||||||
|
// e.g. the StandbyNameNode also waits for the transaction ID to catch up.
|
||||||
|
// This is disabled pending HDFS-13872 and HDFS-13749.
|
||||||
|
@Ignore("Disabled until HDFS-13872 and HDFS-13749 are committed")
|
||||||
@Test
|
@Test
|
||||||
public void testMsyncSimple() throws Exception {
|
public void testMsyncSimple() throws Exception {
|
||||||
// disable fast path here because this test's assertions are based on the
|
// disable fast path here because this test's assertions are based on the
|
||||||
|
@ -304,7 +315,8 @@ public class TestObserverNode {
|
||||||
setUpCluster(1);
|
setUpCluster(1);
|
||||||
setObserverRead(true);
|
setObserverRead(true);
|
||||||
|
|
||||||
AtomicBoolean readSucceed = new AtomicBoolean(false);
|
// 0 == not completed, 1 == succeeded, -1 == failed
|
||||||
|
AtomicInteger readStatus = new AtomicInteger(0);
|
||||||
|
|
||||||
dfs.mkdir(testPath, FsPermission.getDefault());
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
||||||
assertSentTo(0);
|
assertSentTo(0);
|
||||||
|
@ -313,20 +325,21 @@ public class TestObserverNode {
|
||||||
try {
|
try {
|
||||||
// this read will block until roll and tail edits happen.
|
// this read will block until roll and tail edits happen.
|
||||||
dfs.getFileStatus(testPath);
|
dfs.getFileStatus(testPath);
|
||||||
readSucceed.set(true);
|
readStatus.set(1);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
readStatus.set(-1);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
reader.start();
|
reader.start();
|
||||||
// the reader is still blocking, not succeeded yet.
|
// the reader is still blocking, not succeeded yet.
|
||||||
assertFalse(readSucceed.get());
|
assertEquals(0, readStatus.get());
|
||||||
rollEditLogAndTail(0);
|
rollEditLogAndTail(0);
|
||||||
// wait a while for all the change to be done
|
// wait a while for all the change to be done
|
||||||
Thread.sleep(100);
|
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
|
||||||
// the reader should have succeed.
|
// the reader should have succeed.
|
||||||
assertTrue(readSucceed.get());
|
assertEquals(1, readStatus.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setUpCluster(int numObservers) throws Exception {
|
private void setUpCluster(int numObservers) throws Exception {
|
||||||
|
|
|
@ -0,0 +1,335 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link ObserverReadProxyProvider} under various configurations of
|
||||||
|
* NameNode states. Mainly testing that the proxy provider picks the correct
|
||||||
|
* NameNode to communicate with.
|
||||||
|
*/
|
||||||
|
public class TestObserverReadProxyProvider {
|
||||||
|
|
||||||
|
private static final LocatedBlock[] EMPTY_BLOCKS = new LocatedBlock[0];
|
||||||
|
private String ns;
|
||||||
|
private URI nnURI;
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
|
||||||
|
private ClientProtocolAnswer[] namenodeAnswers;
|
||||||
|
private String[] namenodeAddrs;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
ns = "testcluster";
|
||||||
|
nnURI = URI.create("hdfs://" + ns);
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupProxyProvider(int namenodeCount) throws Exception {
|
||||||
|
String[] namenodeIDs = new String[namenodeCount];
|
||||||
|
namenodeAddrs = new String[namenodeCount];
|
||||||
|
namenodeAnswers = new ClientProtocolAnswer[namenodeCount];
|
||||||
|
ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
|
||||||
|
Map<String, ClientProtocol> proxyMap = new HashMap<>();
|
||||||
|
for (int i = 0; i < namenodeCount; i++) {
|
||||||
|
namenodeIDs[i] = "nn" + i;
|
||||||
|
namenodeAddrs[i] = "namenode" + i + ".test:8020";
|
||||||
|
conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns +
|
||||||
|
"." + namenodeIDs[i], namenodeAddrs[i]);
|
||||||
|
namenodeAnswers[i] = new ClientProtocolAnswer();
|
||||||
|
proxies[i] = mock(ClientProtocol.class);
|
||||||
|
doWrite(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
|
||||||
|
doRead(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
|
||||||
|
proxyMap.put(namenodeAddrs[i], proxies[i]);
|
||||||
|
}
|
||||||
|
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
|
||||||
|
Joiner.on(",").join(namenodeIDs));
|
||||||
|
proxyProvider = new ObserverReadProxyProvider<>(conf, nnURI,
|
||||||
|
ClientProtocol.class, new ClientHAProxyFactory<ClientProtocol>() {
|
||||||
|
@Override
|
||||||
|
public ClientProtocol createProxy(Configuration conf,
|
||||||
|
InetSocketAddress nnAddr, Class<ClientProtocol> xface,
|
||||||
|
UserGroupInformation ugi, boolean withRetries,
|
||||||
|
AtomicBoolean fallbackToSimpleAuth) {
|
||||||
|
return proxyMap.get(nnAddr.toString());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
proxyProvider.setObserverReadEnabled(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadOperationOnObserver() throws Exception {
|
||||||
|
setupProxyProvider(3);
|
||||||
|
namenodeAnswers[0].setActiveState();
|
||||||
|
namenodeAnswers[2].setObserverState();
|
||||||
|
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteOperationOnActive() throws Exception {
|
||||||
|
setupProxyProvider(3);
|
||||||
|
namenodeAnswers[0].setActiveState();
|
||||||
|
namenodeAnswers[2].setObserverState();
|
||||||
|
|
||||||
|
doWrite();
|
||||||
|
assertHandledBy(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnreachableObserverWithNoBackup() throws Exception {
|
||||||
|
setupProxyProvider(2);
|
||||||
|
namenodeAnswers[0].setActiveState();
|
||||||
|
namenodeAnswers[1].setObserverState();
|
||||||
|
|
||||||
|
namenodeAnswers[1].setUnreachable(true);
|
||||||
|
// Confirm that read still succeeds even though observer is not available
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnreachableObserverWithMultiple() throws Exception {
|
||||||
|
setupProxyProvider(4);
|
||||||
|
namenodeAnswers[0].setActiveState();
|
||||||
|
namenodeAnswers[2].setObserverState();
|
||||||
|
namenodeAnswers[3].setObserverState();
|
||||||
|
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(2);
|
||||||
|
|
||||||
|
namenodeAnswers[2].setUnreachable(true);
|
||||||
|
doRead();
|
||||||
|
// Fall back to the second observer node
|
||||||
|
assertHandledBy(3);
|
||||||
|
|
||||||
|
namenodeAnswers[2].setUnreachable(false);
|
||||||
|
doRead();
|
||||||
|
// Current index has changed, so although the first observer is back,
|
||||||
|
// it should continue requesting from the second observer
|
||||||
|
assertHandledBy(3);
|
||||||
|
|
||||||
|
namenodeAnswers[3].setUnreachable(true);
|
||||||
|
doRead();
|
||||||
|
// Now that second is unavailable, go back to using the first observer
|
||||||
|
assertHandledBy(2);
|
||||||
|
|
||||||
|
namenodeAnswers[2].setUnreachable(true);
|
||||||
|
doRead();
|
||||||
|
// Both observers are now unavailable, so it should fall back to active
|
||||||
|
assertHandledBy(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testObserverToActive() throws Exception {
|
||||||
|
setupProxyProvider(3);
|
||||||
|
namenodeAnswers[0].setActiveState();
|
||||||
|
namenodeAnswers[1].setObserverState();
|
||||||
|
namenodeAnswers[2].setObserverState();
|
||||||
|
|
||||||
|
doWrite();
|
||||||
|
assertHandledBy(0);
|
||||||
|
|
||||||
|
// Transition an observer to active
|
||||||
|
namenodeAnswers[0].setStandbyState();
|
||||||
|
namenodeAnswers[1].setActiveState();
|
||||||
|
try {
|
||||||
|
doWrite();
|
||||||
|
fail("Write should fail; failover required");
|
||||||
|
} catch (RemoteException re) {
|
||||||
|
assertEquals(re.getClassName(),
|
||||||
|
StandbyException.class.getCanonicalName());
|
||||||
|
}
|
||||||
|
proxyProvider.performFailover(proxyProvider.getProxy().proxy);
|
||||||
|
doWrite();
|
||||||
|
// After failover, previous observer is now active
|
||||||
|
assertHandledBy(1);
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(2);
|
||||||
|
|
||||||
|
// Transition back to original state but second observer not available
|
||||||
|
namenodeAnswers[0].setActiveState();
|
||||||
|
namenodeAnswers[1].setObserverState();
|
||||||
|
namenodeAnswers[2].setUnreachable(true);
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
try {
|
||||||
|
doWrite();
|
||||||
|
fail("Should have failed");
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
proxyProvider.performFailover(proxyProvider.getProxy().proxy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
doWrite();
|
||||||
|
assertHandledBy(0);
|
||||||
|
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testObserverToStandby() throws Exception {
|
||||||
|
setupProxyProvider(3);
|
||||||
|
namenodeAnswers[0].setActiveState();
|
||||||
|
namenodeAnswers[1].setObserverState();
|
||||||
|
namenodeAnswers[2].setObserverState();
|
||||||
|
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(1);
|
||||||
|
|
||||||
|
namenodeAnswers[1].setStandbyState();
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(2);
|
||||||
|
|
||||||
|
namenodeAnswers[2].setStandbyState();
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(0);
|
||||||
|
|
||||||
|
namenodeAnswers[1].setObserverState();
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleObserverToStandby() throws Exception {
|
||||||
|
setupProxyProvider(2);
|
||||||
|
namenodeAnswers[0].setActiveState();
|
||||||
|
namenodeAnswers[1].setObserverState();
|
||||||
|
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(1);
|
||||||
|
|
||||||
|
namenodeAnswers[1].setStandbyState();
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(0);
|
||||||
|
|
||||||
|
namenodeAnswers[1].setObserverState();
|
||||||
|
// The proxy provider still thinks the second NN is in observer state,
|
||||||
|
// so it will take a second call for it to notice the new observer
|
||||||
|
doRead();
|
||||||
|
doRead();
|
||||||
|
assertHandledBy(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doRead() throws Exception {
|
||||||
|
doRead(proxyProvider.getProxy().proxy);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doWrite() throws Exception {
|
||||||
|
doWrite(proxyProvider.getProxy().proxy);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertHandledBy(int namenodeIdx) {
|
||||||
|
assertEquals(namenodeAddrs[namenodeIdx],
|
||||||
|
proxyProvider.getLastProxy().proxyInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void doWrite(ClientProtocol client) throws Exception {
|
||||||
|
client.reportBadBlocks(EMPTY_BLOCKS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void doRead(ClientProtocol client) throws Exception {
|
||||||
|
client.checkAccess("/", FsAction.READ);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link Answer} used for mocking of a {@link ClientProtocol}. Setting
|
||||||
|
* the state or unreachability of this Answer will make the linked
|
||||||
|
* ClientProtocol respond as if it was communicating with a NameNode of
|
||||||
|
* the corresponding state. It is in Standby state by default.
|
||||||
|
*/
|
||||||
|
private static class ClientProtocolAnswer implements Answer<Void> {
|
||||||
|
|
||||||
|
private volatile boolean unreachable = false;
|
||||||
|
// Standby state by default
|
||||||
|
private volatile boolean allowWrites = false;
|
||||||
|
private volatile boolean allowReads = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
if (unreachable) {
|
||||||
|
throw new IOException("Unavailable");
|
||||||
|
}
|
||||||
|
switch (invocationOnMock.getMethod().getName()) {
|
||||||
|
case "reportBadBlocks":
|
||||||
|
if (!allowWrites) {
|
||||||
|
throw new RemoteException(StandbyException.class.getCanonicalName(),
|
||||||
|
"No writes!");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
case "checkAccess":
|
||||||
|
if (!allowReads) {
|
||||||
|
throw new RemoteException(StandbyException.class.getCanonicalName(),
|
||||||
|
"No reads!");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Only reportBadBlocks and checkAccess supported!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void setUnreachable(boolean unreachable) {
|
||||||
|
this.unreachable = unreachable;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setActiveState() {
|
||||||
|
allowReads = true;
|
||||||
|
allowWrites = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setStandbyState() {
|
||||||
|
allowReads = false;
|
||||||
|
allowWrites = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setObserverState() {
|
||||||
|
allowReads = true;
|
||||||
|
allowWrites = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue