diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 4f708a5b280..d9d6f42bcdf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -91,6 +91,7 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; @@ -3056,4 +3057,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void msync() throws IOException { namenode.msync(); } + + /** + * An unblocking call to get the HA service state of NameNode. + * + * @return HA state of NameNode + * @throws IOException + */ + @VisibleForTesting + public HAServiceProtocol.HAServiceState getHAServiceState() + throws IOException { + return namenode.getHAServiceState(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index b71e84dc5d1..93227bdc868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -25,16 +25,13 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.ipc.AlignmentContext; -import org.apache.hadoop.ipc.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -353,34 +350,6 @@ public class NameNodeProxiesClient { fallbackToSimpleAuth, null); } - /** - * Creates a non-HA proxy object with {@link HAServiceProtocol} to the - * given NameNode address, using the provided configuration. The proxy will - * use the RPC timeout configuration specified via {@link - * org.apache.hadoop.fs.CommonConfigurationKeys#IPC_CLIENT_RPC_TIMEOUT_KEY}. - * Upon failures, this will retry up to certain times with {@link RetryProxy}. - * - * @param address the NameNode address - * @param conf the configuration to be used - * @return a non-HA proxy with {@link HAServiceProtocol}. - */ - public static HAServiceProtocol createNonHAProxyWithHAServiceProtocol( - InetSocketAddress address, Configuration conf) throws IOException { - RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry( - MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS); - - HAServiceProtocol proxy = - new HAServiceProtocolClientSideTranslatorPB( - address, conf, NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf)); - return (HAServiceProtocol) RetryProxy.create( - HAServiceProtocol.class, - new DefaultFailoverProxyProvider<>(HAServiceProtocol.class, proxy), - new HashMap<>(), - timeoutPolicy - ); - } - public static ClientProtocol createProxyWithAlignmentContext( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index bb7092cd21a..9c4f39e13b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; @@ -1581,7 +1582,7 @@ public interface ClientProtocol { * @throws IOException see specific implementation */ @Idempotent - @ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call + @ReadOnly(isCoordinated = true) void checkAccess(String path, FsAction mode) throws IOException; /** @@ -1735,6 +1736,16 @@ public interface ClientProtocol { BatchedEntries listOpenFiles(long prevId, EnumSet openFilesTypes, String path) throws IOException; + /** + * Get HA service state of the server. + * + * @return server HA state + * @throws IOException + */ + @Idempotent + @ReadOnly + HAServiceProtocol.HAServiceState getHAServiceState() throws IOException; + /** * Called by client to wait until the server has reached the state id of the * client. The client and server state id are given by client side and server diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 36e8149b4f5..441e039d837 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -48,6 +48,8 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto; import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; @@ -140,6 +142,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; @@ -1890,4 +1893,28 @@ public class ClientNamenodeProtocolTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + @Override + public HAServiceProtocol.HAServiceState getHAServiceState() + throws IOException { + HAServiceStateRequestProto req = + HAServiceStateRequestProto.newBuilder().build(); + try { + HAServiceStateProto res = + rpcProxy.getHAServiceState(null, req).getState(); + switch(res) { + case ACTIVE: + return HAServiceProtocol.HAServiceState.ACTIVE; + case STANDBY: + return HAServiceProtocol.HAServiceState.STANDBY; + case OBSERVER: + return HAServiceProtocol.HAServiceState.OBSERVER; + case INITIALIZING: + default: + return HAServiceProtocol.HAServiceState.INITIALIZING; + } + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index 1b5ad16dbe2..572cb1ccd37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -28,14 +28,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HAUtilClient; -import org.apache.hadoop.hdfs.NameNodeProxiesClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.security.UserGroupInformation; @@ -122,44 +119,22 @@ public abstract class AbstractNNFailoverProxyProvider implements */ private HAServiceState cachedState; - /** Proxy for getting HA service status from the given NameNode. */ - private HAServiceProtocol serviceProxy; - - public NNProxyInfo(InetSocketAddress address, Configuration conf) { + public NNProxyInfo(InetSocketAddress address) { super(null, address.toString()); this.address = address; - try { - serviceProxy = NameNodeProxiesClient - .createNonHAProxyWithHAServiceProtocol(address, conf); - } catch (IOException ioe) { - LOG.error("Failed to create HAServiceProtocol proxy to NameNode" + - " at {}", address, ioe); - throw new RuntimeException(ioe); - } } public InetSocketAddress getAddress() { return address; } - public void refreshCachedState() { - try { - cachedState = serviceProxy.getServiceStatus().getState(); - } catch (IOException e) { - LOG.warn("Failed to connect to {}. Setting cached state to Standby", - address, e); - cachedState = HAServiceState.STANDBY; - } + public void setCachedState(HAServiceState state) { + cachedState = state; } public HAServiceState getCachedState() { return cachedState; } - - @VisibleForTesting - public void setServiceProxyForTesting(HAServiceProtocol proxy) { - this.serviceProxy = proxy; - } } @Override @@ -202,7 +177,7 @@ public abstract class AbstractNNFailoverProxyProvider implements Collection addressesOfNns = addressesInNN.values(); for (InetSocketAddress address : addressesOfNns) { - proxies.add(new NNProxyInfo(address, conf)); + proxies.add(new NNProxyInfo(address)); } // Randomize the list to prevent all clients pointing to the same one boolean randomized = getRandomOrder(conf, uri); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java index 8062e79d24d..3eb181d5cd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java @@ -48,7 +48,7 @@ public class IPFailoverProxyProvider extends public IPFailoverProxyProvider(Configuration conf, URI uri, Class xface, HAProxyFactory factory) { super(conf, uri, xface, factory); - this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri), conf); + this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri)); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index 17bad65ee94..1e85a8e7a0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -211,7 +211,14 @@ public class ObserverReadProxyProvider currentProxy = null; currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); - currentProxy.refreshCachedState(); + try { + HAServiceState state = currentProxy.proxy.getHAServiceState(); + currentProxy.setCachedState(state); + } catch (IOException e) { + LOG.info("Failed to connect to {}. Setting cached state to Standby", + currentProxy.getAddress(), e); + currentProxy.setCachedState(HAServiceState.STANDBY); + } LOG.debug("Changed current proxy from {} to {}", initial == null ? "none" : initial.proxyInfo, currentProxy.proxyInfo); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index a5aa082de83..2bbca61e67c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -35,6 +35,7 @@ import "xattr.proto"; import "encryption.proto"; import "inotify.proto"; import "erasurecoding.proto"; +import "HAServiceProtocol.proto"; /** * The ClientNamenodeProtocol Service defines the interface between a client @@ -808,6 +809,13 @@ message MsyncRequestProto { message MsyncResponseProto { } +message HAServiceStateRequestProto { +} + +message HAServiceStateResponseProto { + required hadoop.common.HAServiceStateProto state = 1; +} + service ClientNamenodeProtocol { rpc getBlockLocations(GetBlockLocationsRequestProto) returns(GetBlockLocationsResponseProto); @@ -990,4 +998,6 @@ service ClientNamenodeProtocol { returns(ListOpenFilesResponseProto); rpc msync(MsyncRequestProto) returns(MsyncResponseProto); + rpc getHAServiceState(HAServiceStateRequestProto) + returns(HAServiceStateResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java index 57db8acfc3a..e0432f5e7ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java @@ -72,7 +72,8 @@ public class TestReadOnly { "getCurrentEditLogTxid", "getEditsFromTxid", "getQuotaUsage", - "msync" + "msync", + "getHAServiceState" ) ); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 47dfc463b48..06edcb4d6e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -63,6 +63,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; @@ -2064,7 +2065,13 @@ public class RouterRpcServer extends AbstractService OpenFilesIterator.FILTER_PATH_DEFAULT); } - @Override + @Override // ClientProtocol + public HAServiceProtocol.HAServiceState getHAServiceState() + throws IOException { + return null; + } + + @Override // ClientProtocol public BatchedEntries listOpenFiles(long prevId, EnumSet openFilesTypes, String path) throws IOException { checkOperation(OperationCategory.READ, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index d5f7b9bdc5f..35845874377 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -30,6 +30,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto; import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -150,6 +153,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; @@ -1838,4 +1843,35 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } } + + @Override + public HAServiceStateResponseProto getHAServiceState( + RpcController controller, + HAServiceStateRequestProto request) throws ServiceException { + try { + HAServiceProtocol.HAServiceState state = server.getHAServiceState(); + HAServiceStateProto retState; + switch (state) { + case ACTIVE: + retState = HAServiceProtocolProtos.HAServiceStateProto.ACTIVE; + break; + case STANDBY: + retState = HAServiceProtocolProtos.HAServiceStateProto.STANDBY; + break; + case OBSERVER: + retState = HAServiceProtocolProtos.HAServiceStateProto.OBSERVER; + break; + case INITIALIZING: + default: + retState = HAServiceProtocolProtos.HAServiceStateProto.INITIALIZING; + break; + } + HAServiceStateResponseProto.Builder builder = + HAServiceStateResponseProto.newBuilder(); + builder.setState(retState); + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 5d8e12bd4fc..644a480e8dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1345,6 +1345,12 @@ public class NameNodeRpcServer implements NamenodeProtocols { // TODO : need to be filled up if needed. May be a no-op here. } + @Override // ClientProtocol + public HAServiceState getHAServiceState() throws IOException { + checkNNStartup(); + return nn.getServiceStatus().getState(); + } + @Override // ClientProtocol public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index 3048842f369..e1fadafdee7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -89,6 +89,9 @@ public class TestConsistentReadsObserver { // 0 == not completed, 1 == succeeded, -1 == failed AtomicInteger readStatus = new AtomicInteger(0); + // Making an uncoordinated call, which initialize the proxy + // to Observer node. + dfs.getClient().getHAServiceState(); dfs.mkdir(testPath, FsPermission.getDefault()); assertSentTo(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index dfd8488301b..caf7d003eac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -27,8 +27,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -47,7 +45,6 @@ import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests for {@link ObserverReadProxyProvider} under various configurations of @@ -79,8 +76,6 @@ public class TestObserverReadProxyProvider { namenodeAnswers = new NameNodeAnswer[namenodeCount]; ClientProtocol[] proxies = new ClientProtocol[namenodeCount]; Map proxyMap = new HashMap<>(); - HAServiceProtocol[] serviceProxies = new HAServiceProtocol[namenodeCount]; - Map serviceProxyMap = new HashMap<>(); for (int i = 0; i < namenodeCount; i++) { namenodeIDs[i] = "nn" + i; namenodeAddrs[i] = "namenode" + i + ".test:8020"; @@ -92,11 +87,9 @@ public class TestObserverReadProxyProvider { .when(proxies[i])); doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer) .when(proxies[i])); - serviceProxies[i] = mock(HAServiceProtocol.class); - Mockito.doAnswer(namenodeAnswers[i].serviceAnswer) - .when(serviceProxies[i]).getServiceStatus(); + Mockito.doAnswer(namenodeAnswers[i].clientAnswer) + .when(proxies[i]).getHAServiceState(); proxyMap.put(namenodeAddrs[i], proxies[i]); - serviceProxyMap.put(namenodeAddrs[i], serviceProxies[i]); } conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, Joiner.on(",").join(namenodeIDs)); @@ -116,10 +109,6 @@ public class TestObserverReadProxyProvider { URI uri, String addressKey) { List> nnProxies = super.getProxyAddresses(uri, addressKey); - for (NNProxyInfo nnProxy : nnProxies) { - String addressStr = nnProxy.getAddress().toString(); - nnProxy.setServiceProxyForTesting(serviceProxyMap.get(addressStr)); - } return nnProxies; } }; @@ -322,8 +311,8 @@ public class TestObserverReadProxyProvider { } /** - * An {@link Answer} used for mocking of {@link ClientProtocol} and - * {@link HAServiceProtocol}. Setting the state or unreachability of this + * An {@link Answer} used for mocking of {@link ClientProtocol}. + * Setting the state or unreachability of this * Answer will make the linked ClientProtocol respond as if it was * communicating with a NameNode of the corresponding state. It is in Standby * state by default. @@ -338,31 +327,29 @@ public class TestObserverReadProxyProvider { private volatile boolean allowReads = false; private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer(); - private HAServiceProtocolAnswer serviceAnswer = - new HAServiceProtocolAnswer(); - private class HAServiceProtocolAnswer implements Answer { + private class ClientProtocolAnswer implements Answer { @Override - public HAServiceStatus answer(InvocationOnMock invocation) - throws Throwable { - HAServiceStatus status = mock(HAServiceStatus.class); - if (allowReads && allowWrites) { - when(status.getState()).thenReturn(HAServiceState.ACTIVE); - } else if (allowReads) { - when(status.getState()).thenReturn(HAServiceState.OBSERVER); - } else { - when(status.getState()).thenReturn(HAServiceState.STANDBY); - } - return status; - } - } - - private class ClientProtocolAnswer implements Answer { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { if (unreachable) { throw new IOException("Unavailable"); } + // retryActive should be checked before getHAServiceState. + // Check getHAServiceState first here only because in test, + // it relies read call, which relies on getHAServiceState + // to have passed already. May revisit future. + if (invocationOnMock.getMethod() + .getName().equals("getHAServiceState")) { + HAServiceState status; + if (allowReads && allowWrites) { + status = HAServiceState.ACTIVE; + } else if (allowReads) { + status = HAServiceState.OBSERVER; + } else { + status = HAServiceState.STANDBY; + } + return status; + } if (retryActive) { throw new RemoteException( ObserverRetryOnActiveException.class.getCanonicalName(),