HDFS-14035. NN status discovery does not leverage delegation token. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2018-11-14 13:32:13 -08:00
parent 4ce7f9f2e6
commit 683daedc1f
14 changed files with 152 additions and 100 deletions

View File

@ -91,6 +91,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
@ -3056,4 +3057,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public void msync() throws IOException { public void msync() throws IOException {
namenode.msync(); namenode.msync();
} }
/**
* An unblocking call to get the HA service state of NameNode.
*
* @return HA state of NameNode
* @throws IOException
*/
@VisibleForTesting
public HAServiceProtocol.HAServiceState getHAServiceState()
throws IOException {
return namenode.getHAServiceState();
}
} }

View File

@ -25,16 +25,13 @@ import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.Client;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -353,34 +350,6 @@ public class NameNodeProxiesClient {
fallbackToSimpleAuth, null); fallbackToSimpleAuth, null);
} }
/**
* Creates a non-HA proxy object with {@link HAServiceProtocol} to the
* given NameNode address, using the provided configuration. The proxy will
* use the RPC timeout configuration specified via {@link
* org.apache.hadoop.fs.CommonConfigurationKeys#IPC_CLIENT_RPC_TIMEOUT_KEY}.
* Upon failures, this will retry up to certain times with {@link RetryProxy}.
*
* @param address the NameNode address
* @param conf the configuration to be used
* @return a non-HA proxy with {@link HAServiceProtocol}.
*/
public static HAServiceProtocol createNonHAProxyWithHAServiceProtocol(
InetSocketAddress address, Configuration conf) throws IOException {
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS);
HAServiceProtocol proxy =
new HAServiceProtocolClientSideTranslatorPB(
address, conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf));
return (HAServiceProtocol) RetryProxy.create(
HAServiceProtocol.class,
new DefaultFailoverProxyProvider<>(HAServiceProtocol.class, proxy),
new HashMap<>(),
timeoutPolicy
);
}
public static ClientProtocol createProxyWithAlignmentContext( public static ClientProtocol createProxyWithAlignmentContext(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi, InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth, boolean withRetries, AtomicBoolean fallbackToSimpleAuth,

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
@ -1581,7 +1582,7 @@ public interface ClientProtocol {
* @throws IOException see specific implementation * @throws IOException see specific implementation
*/ */
@Idempotent @Idempotent
@ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call @ReadOnly(isCoordinated = true)
void checkAccess(String path, FsAction mode) throws IOException; void checkAccess(String path, FsAction mode) throws IOException;
/** /**
@ -1735,6 +1736,16 @@ public interface ClientProtocol {
BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException; EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException;
/**
* Get HA service state of the server.
*
* @return server HA state
* @throws IOException
*/
@Idempotent
@ReadOnly
HAServiceProtocol.HAServiceState getHAServiceState() throws IOException;
/** /**
* Called by client to wait until the server has reached the state id of the * Called by client to wait until the server has reached the state id of the
* client. The client and server state id are given by client side and server * client. The client and server state id are given by client side and server

View File

@ -48,6 +48,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
@ -140,6 +142,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
@ -1890,4 +1893,28 @@ public class ClientNamenodeProtocolTranslatorPB implements
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override
public HAServiceProtocol.HAServiceState getHAServiceState()
throws IOException {
HAServiceStateRequestProto req =
HAServiceStateRequestProto.newBuilder().build();
try {
HAServiceStateProto res =
rpcProxy.getHAServiceState(null, req).getState();
switch(res) {
case ACTIVE:
return HAServiceProtocol.HAServiceState.ACTIVE;
case STANDBY:
return HAServiceProtocol.HAServiceState.STANDBY;
case OBSERVER:
return HAServiceProtocol.HAServiceState.OBSERVER;
case INITIALIZING:
default:
return HAServiceProtocol.HAServiceState.INITIALIZING;
}
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
} }

View File

@ -28,14 +28,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient; import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -122,44 +119,22 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
*/ */
private HAServiceState cachedState; private HAServiceState cachedState;
/** Proxy for getting HA service status from the given NameNode. */ public NNProxyInfo(InetSocketAddress address) {
private HAServiceProtocol serviceProxy;
public NNProxyInfo(InetSocketAddress address, Configuration conf) {
super(null, address.toString()); super(null, address.toString());
this.address = address; this.address = address;
try {
serviceProxy = NameNodeProxiesClient
.createNonHAProxyWithHAServiceProtocol(address, conf);
} catch (IOException ioe) {
LOG.error("Failed to create HAServiceProtocol proxy to NameNode" +
" at {}", address, ioe);
throw new RuntimeException(ioe);
}
} }
public InetSocketAddress getAddress() { public InetSocketAddress getAddress() {
return address; return address;
} }
public void refreshCachedState() { public void setCachedState(HAServiceState state) {
try { cachedState = state;
cachedState = serviceProxy.getServiceStatus().getState();
} catch (IOException e) {
LOG.warn("Failed to connect to {}. Setting cached state to Standby",
address, e);
cachedState = HAServiceState.STANDBY;
}
} }
public HAServiceState getCachedState() { public HAServiceState getCachedState() {
return cachedState; return cachedState;
} }
@VisibleForTesting
public void setServiceProxyForTesting(HAServiceProtocol proxy) {
this.serviceProxy = proxy;
}
} }
@Override @Override
@ -202,7 +177,7 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values(); Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
for (InetSocketAddress address : addressesOfNns) { for (InetSocketAddress address : addressesOfNns) {
proxies.add(new NNProxyInfo<T>(address, conf)); proxies.add(new NNProxyInfo<T>(address));
} }
// Randomize the list to prevent all clients pointing to the same one // Randomize the list to prevent all clients pointing to the same one
boolean randomized = getRandomOrder(conf, uri); boolean randomized = getRandomOrder(conf, uri);

View File

@ -48,7 +48,7 @@ public class IPFailoverProxyProvider<T> extends
public IPFailoverProxyProvider(Configuration conf, URI uri, public IPFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory) { Class<T> xface, HAProxyFactory<T> factory) {
super(conf, uri, xface, factory); super(conf, uri, xface, factory);
this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri), conf); this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri));
} }
@Override @Override

View File

@ -211,7 +211,14 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
currentProxy = null; currentProxy = null;
currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentIndex = (currentIndex + 1) % nameNodeProxies.size();
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
currentProxy.refreshCachedState(); try {
HAServiceState state = currentProxy.proxy.getHAServiceState();
currentProxy.setCachedState(state);
} catch (IOException e) {
LOG.info("Failed to connect to {}. Setting cached state to Standby",
currentProxy.getAddress(), e);
currentProxy.setCachedState(HAServiceState.STANDBY);
}
LOG.debug("Changed current proxy from {} to {}", LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo, initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo); currentProxy.proxyInfo);

View File

@ -35,6 +35,7 @@ import "xattr.proto";
import "encryption.proto"; import "encryption.proto";
import "inotify.proto"; import "inotify.proto";
import "erasurecoding.proto"; import "erasurecoding.proto";
import "HAServiceProtocol.proto";
/** /**
* The ClientNamenodeProtocol Service defines the interface between a client * The ClientNamenodeProtocol Service defines the interface between a client
@ -808,6 +809,13 @@ message MsyncRequestProto {
message MsyncResponseProto { message MsyncResponseProto {
} }
message HAServiceStateRequestProto {
}
message HAServiceStateResponseProto {
required hadoop.common.HAServiceStateProto state = 1;
}
service ClientNamenodeProtocol { service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto) rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto); returns(GetBlockLocationsResponseProto);
@ -990,4 +998,6 @@ service ClientNamenodeProtocol {
returns(ListOpenFilesResponseProto); returns(ListOpenFilesResponseProto);
rpc msync(MsyncRequestProto) rpc msync(MsyncRequestProto)
returns(MsyncResponseProto); returns(MsyncResponseProto);
rpc getHAServiceState(HAServiceStateRequestProto)
returns(HAServiceStateResponseProto);
} }

View File

@ -72,7 +72,8 @@ public class TestReadOnly {
"getCurrentEditLogTxid", "getCurrentEditLogTxid",
"getEditsFromTxid", "getEditsFromTxid",
"getQuotaUsage", "getQuotaUsage",
"msync" "msync",
"getHAServiceState"
) )
); );

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
@ -2064,7 +2065,13 @@ public class RouterRpcServer extends AbstractService
OpenFilesIterator.FILTER_PATH_DEFAULT); OpenFilesIterator.FILTER_PATH_DEFAULT);
} }
@Override @Override // ClientProtocol
public HAServiceProtocol.HAServiceState getHAServiceState()
throws IOException {
return null;
}
@Override // ClientProtocol
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId, public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException { EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
checkOperation(OperationCategory.READ, false); checkOperation(OperationCategory.READ, false);

View File

@ -30,6 +30,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
@ -150,6 +153,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
@ -1838,4 +1843,35 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e); throw new ServiceException(e);
} }
} }
@Override
public HAServiceStateResponseProto getHAServiceState(
RpcController controller,
HAServiceStateRequestProto request) throws ServiceException {
try {
HAServiceProtocol.HAServiceState state = server.getHAServiceState();
HAServiceStateProto retState;
switch (state) {
case ACTIVE:
retState = HAServiceProtocolProtos.HAServiceStateProto.ACTIVE;
break;
case STANDBY:
retState = HAServiceProtocolProtos.HAServiceStateProto.STANDBY;
break;
case OBSERVER:
retState = HAServiceProtocolProtos.HAServiceStateProto.OBSERVER;
break;
case INITIALIZING:
default:
retState = HAServiceProtocolProtos.HAServiceStateProto.INITIALIZING;
break;
}
HAServiceStateResponseProto.Builder builder =
HAServiceStateResponseProto.newBuilder();
builder.setState(retState);
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
} }

View File

@ -1345,6 +1345,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
// TODO : need to be filled up if needed. May be a no-op here. // TODO : need to be filled up if needed. May be a no-op here.
} }
@Override // ClientProtocol
public HAServiceState getHAServiceState() throws IOException {
checkNNStartup();
return nn.getServiceStatus().getState();
}
@Override // ClientProtocol @Override // ClientProtocol
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
throws IOException { throws IOException {

View File

@ -89,6 +89,9 @@ public class TestConsistentReadsObserver {
// 0 == not completed, 1 == succeeded, -1 == failed // 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0); AtomicInteger readStatus = new AtomicInteger(0);
// Making an uncoordinated call, which initialize the proxy
// to Observer node.
dfs.getClient().getHAServiceState();
dfs.mkdir(testPath, FsPermission.getDefault()); dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0); assertSentTo(0);

View File

@ -27,8 +27,6 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -47,7 +45,6 @@ import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** /**
* Tests for {@link ObserverReadProxyProvider} under various configurations of * Tests for {@link ObserverReadProxyProvider} under various configurations of
@ -79,8 +76,6 @@ public class TestObserverReadProxyProvider {
namenodeAnswers = new NameNodeAnswer[namenodeCount]; namenodeAnswers = new NameNodeAnswer[namenodeCount];
ClientProtocol[] proxies = new ClientProtocol[namenodeCount]; ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
Map<String, ClientProtocol> proxyMap = new HashMap<>(); Map<String, ClientProtocol> proxyMap = new HashMap<>();
HAServiceProtocol[] serviceProxies = new HAServiceProtocol[namenodeCount];
Map<String, HAServiceProtocol> serviceProxyMap = new HashMap<>();
for (int i = 0; i < namenodeCount; i++) { for (int i = 0; i < namenodeCount; i++) {
namenodeIDs[i] = "nn" + i; namenodeIDs[i] = "nn" + i;
namenodeAddrs[i] = "namenode" + i + ".test:8020"; namenodeAddrs[i] = "namenode" + i + ".test:8020";
@ -92,11 +87,9 @@ public class TestObserverReadProxyProvider {
.when(proxies[i])); .when(proxies[i]));
doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer) doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
.when(proxies[i])); .when(proxies[i]));
serviceProxies[i] = mock(HAServiceProtocol.class); Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
Mockito.doAnswer(namenodeAnswers[i].serviceAnswer) .when(proxies[i]).getHAServiceState();
.when(serviceProxies[i]).getServiceStatus();
proxyMap.put(namenodeAddrs[i], proxies[i]); proxyMap.put(namenodeAddrs[i], proxies[i]);
serviceProxyMap.put(namenodeAddrs[i], serviceProxies[i]);
} }
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
Joiner.on(",").join(namenodeIDs)); Joiner.on(",").join(namenodeIDs));
@ -116,10 +109,6 @@ public class TestObserverReadProxyProvider {
URI uri, String addressKey) { URI uri, String addressKey) {
List<NNProxyInfo<ClientProtocol>> nnProxies = List<NNProxyInfo<ClientProtocol>> nnProxies =
super.getProxyAddresses(uri, addressKey); super.getProxyAddresses(uri, addressKey);
for (NNProxyInfo<ClientProtocol> nnProxy : nnProxies) {
String addressStr = nnProxy.getAddress().toString();
nnProxy.setServiceProxyForTesting(serviceProxyMap.get(addressStr));
}
return nnProxies; return nnProxies;
} }
}; };
@ -322,8 +311,8 @@ public class TestObserverReadProxyProvider {
} }
/** /**
* An {@link Answer} used for mocking of {@link ClientProtocol} and * An {@link Answer} used for mocking of {@link ClientProtocol}.
* {@link HAServiceProtocol}. Setting the state or unreachability of this * Setting the state or unreachability of this
* Answer will make the linked ClientProtocol respond as if it was * Answer will make the linked ClientProtocol respond as if it was
* communicating with a NameNode of the corresponding state. It is in Standby * communicating with a NameNode of the corresponding state. It is in Standby
* state by default. * state by default.
@ -338,31 +327,29 @@ public class TestObserverReadProxyProvider {
private volatile boolean allowReads = false; private volatile boolean allowReads = false;
private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer(); private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer();
private HAServiceProtocolAnswer serviceAnswer =
new HAServiceProtocolAnswer();
private class HAServiceProtocolAnswer implements Answer<HAServiceStatus> { private class ClientProtocolAnswer implements Answer<Object> {
@Override @Override
public HAServiceStatus answer(InvocationOnMock invocation) public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
throws Throwable {
HAServiceStatus status = mock(HAServiceStatus.class);
if (allowReads && allowWrites) {
when(status.getState()).thenReturn(HAServiceState.ACTIVE);
} else if (allowReads) {
when(status.getState()).thenReturn(HAServiceState.OBSERVER);
} else {
when(status.getState()).thenReturn(HAServiceState.STANDBY);
}
return status;
}
}
private class ClientProtocolAnswer implements Answer<Void> {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
if (unreachable) { if (unreachable) {
throw new IOException("Unavailable"); throw new IOException("Unavailable");
} }
// retryActive should be checked before getHAServiceState.
// Check getHAServiceState first here only because in test,
// it relies read call, which relies on getHAServiceState
// to have passed already. May revisit future.
if (invocationOnMock.getMethod()
.getName().equals("getHAServiceState")) {
HAServiceState status;
if (allowReads && allowWrites) {
status = HAServiceState.ACTIVE;
} else if (allowReads) {
status = HAServiceState.OBSERVER;
} else {
status = HAServiceState.STANDBY;
}
return status;
}
if (retryActive) { if (retryActive) {
throw new RemoteException( throw new RemoteException(
ObserverRetryOnActiveException.class.getCanonicalName(), ObserverRetryOnActiveException.class.getCanonicalName(),