HDFS-14245. [SBN read] Enable ObserverReadProxyProvider to work with non-ClientProtocol proxy types. Contributed by Erik Krogen.

(cherry picked from 5847e00143)
(cherry picked from 6630c9b75d)
This commit is contained in:
Erik Krogen 2019-04-17 14:38:24 -07:00 committed by Erik Krogen
parent 8bb98076be
commit 9fdb849e03
5 changed files with 83 additions and 17 deletions

View File

@ -115,7 +115,8 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
/** /**
* The currently known state of the NameNode represented by this ProxyInfo. * The currently known state of the NameNode represented by this ProxyInfo.
* This may be out of date if the NameNode has changed state since the last * This may be out of date if the NameNode has changed state since the last
* time the state was checked. * time the state was checked. If the NameNode could not be contacted, this
* will store null to indicate an unknown state.
*/ */
private HAServiceState cachedState; private HAServiceState cachedState;

View File

@ -66,7 +66,7 @@ import com.google.common.annotations.VisibleForTesting;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ObserverReadProxyProvider<T extends ClientProtocol> public class ObserverReadProxyProvider<T>
extends AbstractNNFailoverProxyProvider<T> { extends AbstractNNFailoverProxyProvider<T> {
@VisibleForTesting @VisibleForTesting
static final Logger LOG = LoggerFactory.getLogger( static final Logger LOG = LoggerFactory.getLogger(
@ -189,7 +189,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
// TODO : make this configurable or remove this variable // TODO : make this configurable or remove this variable
if (wrappedProxy instanceof ClientProtocol) {
this.observerReadEnabled = true; this.observerReadEnabled = true;
} else {
LOG.info("Disabling observer reads for {} because the requested proxy "
+ "class does not implement {}", uri, ClientProtocol.class.getName());
this.observerReadEnabled = false;
}
} }
public AlignmentContext getAlignmentContext() { public AlignmentContext getAlignmentContext() {
@ -267,7 +273,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) { private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) {
IOException ioe; IOException ioe;
try { try {
return proxyInfo.proxy.getHAServiceState(); return getProxyAsClientProtocol(proxyInfo.proxy).getHAServiceState();
} catch (RemoteException re) { } catch (RemoteException re) {
// Though a Standby will allow a getHAServiceState call, it won't allow // Though a Standby will allow a getHAServiceState call, it won't allow
// delegation token lookup, so if DT is used it throws StandbyException // delegation token lookup, so if DT is used it throws StandbyException
@ -284,7 +290,19 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
LOG.debug("Failed to connect to {} while fetching HAServiceState", LOG.debug("Failed to connect to {} while fetching HAServiceState",
proxyInfo.getAddress(), ioe); proxyInfo.getAddress(), ioe);
} }
return HAServiceState.STANDBY; return null;
}
/**
* Return the input proxy, cast as a {@link ClientProtocol}. This catches any
* {@link ClassCastException} and wraps it in a more helpful message. This
* should ONLY be called if the caller is certain that the proxy is, in fact,
* a {@link ClientProtocol}.
*/
private ClientProtocol getProxyAsClientProtocol(T proxy) {
assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy "
+ "of class " + proxy.getClass() + " as if it was a ClientProtocol.";
return (ClientProtocol) proxy;
} }
/** /**
@ -299,7 +317,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
if (msynced) { if (msynced) {
return; // No need for an msync return; // No need for an msync
} }
failoverProxy.getProxy().proxy.msync(); getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
msynced = true; msynced = true;
lastMsyncTimeMs = Time.monotonicNow(); lastMsyncTimeMs = Time.monotonicNow();
} }
@ -315,7 +333,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private void autoMsyncIfNecessary() throws IOException { private void autoMsyncIfNecessary() throws IOException {
if (autoMsyncPeriodMs == 0) { if (autoMsyncPeriodMs == 0) {
// Always msync // Always msync
failoverProxy.getProxy().proxy.msync(); getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
} else if (autoMsyncPeriodMs > 0) { } else if (autoMsyncPeriodMs > 0) {
if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
synchronized (this) { synchronized (this) {
@ -324,7 +342,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
// Re-check the entry criterion since the status may have changed // Re-check the entry criterion since the status may have changed
// while waiting for the lock. // while waiting for the lock.
if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
failoverProxy.getProxy().proxy.msync(); getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
lastMsyncTimeMs = Time.monotonicNow(); lastMsyncTimeMs = Time.monotonicNow();
} }
} }
@ -363,6 +381,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
int failedObserverCount = 0; int failedObserverCount = 0;
int activeCount = 0; int activeCount = 0;
int standbyCount = 0; int standbyCount = 0;
int unreachableCount = 0;
for (int i = 0; i < nameNodeProxies.size(); i++) { for (int i = 0; i < nameNodeProxies.size(); i++) {
NNProxyInfo<T> current = getCurrentProxy(); NNProxyInfo<T> current = getCurrentProxy();
HAServiceState currState = current.getCachedState(); HAServiceState currState = current.getCachedState();
@ -371,9 +390,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
activeCount++; activeCount++;
} else if (currState == HAServiceState.STANDBY) { } else if (currState == HAServiceState.STANDBY) {
standbyCount++; standbyCount++;
} else if (currState == null) {
unreachableCount++;
} }
LOG.debug("Skipping proxy {} for {} because it is in state {}", LOG.debug("Skipping proxy {} for {} because it is in state {}",
current.proxyInfo, method.getName(), currState); current.proxyInfo, method.getName(),
currState == null ? "unreachable" : currState);
changeProxy(current); changeProxy(current);
continue; continue;
} }
@ -420,10 +442,10 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
// be that there is simply no Observer node running at all. // be that there is simply no Observer node running at all.
if (failedObserverCount > 0) { if (failedObserverCount > 0) {
// If we get here, it means all observers have failed. // If we get here, it means all observers have failed.
LOG.warn("{} observers have failed for read request {}; " LOG.warn("{} observers have failed for read request {}; also found "
+ "also found {} standby, {} active. " + "{} standby, {} active, and {} unreachable. Falling back "
+ "Falling back to active.", failedObserverCount, + "to active.", failedObserverCount, method.getName(),
method.getName(), standbyCount, activeCount); standbyCount, activeCount, unreachableCount);
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Read falling back to active without observer read " LOG.debug("Read falling back to active without observer read "
@ -432,8 +454,9 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
} }
} }
// Either all observers have failed, or that it is a write request. // Either all observers have failed, observer reads are disabled,
// In either case, we'll forward the request to active NameNode. // or this is a write request. In any case, forward the request to
// the active NameNode.
LOG.debug("Using failoverProxy to service {}", method.getName()); LOG.debug("Using failoverProxy to service {}", method.getName());
ProxyInfo<T> activeProxy = failoverProxy.getProxy(); ProxyInfo<T> activeProxy = failoverProxy.getProxy();
try { try {
@ -455,7 +478,8 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
@Override @Override
public ConnectionId getConnectionId() { public ConnectionId getConnectionId() {
return RPC.getConnectionIdForProxy(getCurrentProxy().proxy); return RPC.getConnectionIdForProxy(observerReadEnabled
? getCurrentProxy().proxy : failoverProxy.getProxy().proxy);
} }
} }

View File

@ -152,7 +152,7 @@ public class TestDelegationTokensWithHA {
cluster.shutdownNameNode(0); cluster.shutdownNameNode(0);
logCapture.clearOutput(); logCapture.clearOutput();
dfs.access(new Path("/"), FsAction.READ); dfs.access(new Path("/"), FsAction.READ);
assertTrue(logCapture.getOutput().contains("Assuming Standby state")); assertTrue(logCapture.getOutput().contains("Failed to connect to"));
} finally { } finally {
logCapture.stopCapturing(); logCapture.stopCapturing();
} }

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -115,6 +116,17 @@ public class TestObserverNode {
fail("active cannot be transitioned to observer"); fail("active cannot be transitioned to observer");
} }
/**
* Test that non-ClientProtocol proxies such as
* {@link org.apache.hadoop.tools.GetUserMappingsProtocol} still work
* when run in an environment with observers.
*/
@Test
public void testGetGroups() throws Exception {
GetGroups getGroups = new GetGroups(conf);
assertEquals(0, getGroups.run(new String[0]));
}
@Test @Test
public void testNoObserverToActive() throws Exception { public void testNoObserverToActive() throws Exception {
try { try {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -42,9 +43,12 @@ import org.mockito.stubbing.Answer;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** /**
* Tests for {@link ObserverReadProxyProvider} under various configurations of * Tests for {@link ObserverReadProxyProvider} under various configurations of
@ -115,6 +119,31 @@ public class TestObserverReadProxyProvider {
proxyProvider.setObserverReadEnabled(true); proxyProvider.setObserverReadEnabled(true);
} }
@Test
public void testWithNonClientProxy() throws Exception {
setupProxyProvider(2); // This will initialize all of the instance fields
final String fakeUser = "fakeUser";
final String[] fakeGroups = {"fakeGroup"};
HAProxyFactory<GetUserMappingsProtocol> proxyFactory =
new NameNodeHAProxyFactory<GetUserMappingsProtocol>() {
@Override
public GetUserMappingsProtocol createProxy(Configuration config,
InetSocketAddress addr, Class<GetUserMappingsProtocol> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
GetUserMappingsProtocol proxy =
mock(GetUserMappingsProtocol.class);
when(proxy.getGroupsForUser(fakeUser)).thenReturn(fakeGroups);
return proxy;
}
};
ObserverReadProxyProvider<GetUserMappingsProtocol> userProxyProvider =
new ObserverReadProxyProvider<>(conf, nnURI,
GetUserMappingsProtocol.class, proxyFactory);
assertArrayEquals(fakeGroups,
userProxyProvider.getProxy().proxy.getGroupsForUser(fakeUser));
}
@Test @Test
public void testReadOperationOnObserver() throws Exception { public void testReadOperationOnObserver() throws Exception {
setupProxyProvider(3); setupProxyProvider(3);