diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index ffb8bce52a5..ce7150d445e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -223,8 +223,7 @@ public RpcClient(Configuration conf) throws IOException { retryInterval = OzoneUtils.getTimeDurationInMS(conf, OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL_DEFAULT); - dtService = - getOMProxyProvider().getProxy().getDelegationTokenService(); + dtService = getOMProxyProvider().getCurrentProxyDelegationToken(); boolean isUnsafeByteOperationsEnabled = conf.getBoolean( OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java index 481f221ae33..0e5c3c6a883 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java @@ -30,7 +30,6 @@ import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +58,8 @@ public class OMFailoverProxyProvider implements LoggerFactory.getLogger(OMFailoverProxyProvider.class); // Map of OMNodeID to its proxy - private Map omProxies; + private Map> omProxies; + private Map omProxyInfos; private List omNodeIDList; private String currentProxyOMNodeId; @@ -80,33 +80,9 @@ public OMFailoverProxyProvider(OzoneConfiguration configuration, currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex); } - /** - * Class to store proxy information. - */ - public class OMProxyInfo - extends FailoverProxyProvider.ProxyInfo { - private InetSocketAddress address; - private Text dtService; - - OMProxyInfo(OzoneManagerProtocolPB proxy, String proxyInfoStr, - Text dtService, - InetSocketAddress addr) { - super(proxy, proxyInfoStr); - this.address = addr; - this.dtService = dtService; - } - - public InetSocketAddress getAddress() { - return address; - } - - public Text getDelegationTokenService() { - return dtService; - } - } - private void loadOMClientConfigs(Configuration config) throws IOException { this.omProxies = new HashMap<>(); + this.omProxyInfos = new HashMap<>(); this.omNodeIDList = new ArrayList<>(); Collection omServiceIds = config.getTrimmedStringCollection( @@ -130,25 +106,21 @@ private void loadOMClientConfigs(Configuration config) throws IOException { continue; } - InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr); + OMProxyInfo omProxyInfo = new OMProxyInfo(nodeId, rpcAddrStr); - // Add the OM client proxy info to list of proxies - if (addr != null) { - Text dtService = SecurityUtil.buildTokenService(addr); - StringBuilder proxyInfo = new StringBuilder() - .append(nodeId).append("(") - .append(NetUtils.getHostPortString(addr)).append(")"); - OMProxyInfo omProxyInfo = new OMProxyInfo(null, - proxyInfo.toString(), dtService, addr); + if (omProxyInfo.getAddress() != null) { + + ProxyInfo proxyInfo = + new ProxyInfo(null, omProxyInfo.toString()); // For a non-HA OM setup, nodeId might be null. If so, we assign it // a dummy value if (nodeId == null) { nodeId = OzoneConsts.OM_NODE_ID_DUMMY; } - omProxies.put(nodeId, omProxyInfo); + omProxies.put(nodeId, proxyInfo); + omProxyInfos.put(nodeId, omProxyInfo); omNodeIDList.add(nodeId); - } else { LOG.error("Failed to create OM proxy for {} at address {}", nodeId, rpcAddrStr); @@ -183,26 +155,31 @@ private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress) * @return the OM proxy object to invoke methods upon */ @Override - public synchronized OMProxyInfo getProxy() { - OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyOMNodeId); - createOMProxyIfNeeded(currentOMProxyInfo); - return currentOMProxyInfo; + public synchronized ProxyInfo getProxy() { + ProxyInfo currentProxyInfo = omProxies.get(currentProxyOMNodeId); + createOMProxyIfNeeded(currentProxyInfo, currentProxyOMNodeId); + return currentProxyInfo; } /** - * Creates OM proxy object if it does not already exist. + * Creates proxy object if it does not already exist. */ - private OMProxyInfo createOMProxyIfNeeded(OMProxyInfo proxyInfo) { + private void createOMProxyIfNeeded(ProxyInfo proxyInfo, + String nodeId) { if (proxyInfo.proxy == null) { + InetSocketAddress address = omProxyInfos.get(nodeId).getAddress(); try { - proxyInfo.proxy = createOMProxy(proxyInfo.address); + proxyInfo.proxy = createOMProxy(address); } catch (IOException ioe) { LOG.error("{} Failed to create RPC proxy to OM at {}", - this.getClass().getSimpleName(), proxyInfo.address, ioe); + this.getClass().getSimpleName(), address, ioe); throw new RuntimeException(ioe); } } - return proxyInfo; + } + + public synchronized Text getCurrentProxyDelegationToken() { + return omProxyInfos.get(currentProxyOMNodeId).getDelegationTokenService(); } /** @@ -269,7 +246,7 @@ synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) { */ @Override public synchronized void close() throws IOException { - for (OMProxyInfo proxy : omProxies.values()) { + for (ProxyInfo proxy : omProxies.values()) { OzoneManagerProtocolPB omProxy = proxy.proxy; if (omProxy != null) { RPC.stopProxy(omProxy); @@ -278,8 +255,13 @@ public synchronized void close() throws IOException { } @VisibleForTesting - public List getOMProxies() { - return new ArrayList<>(omProxies.values()); + public List getOMProxies() { + return new ArrayList(omProxies.values()); + } + + @VisibleForTesting + public List getOMProxyInfos() { + return new ArrayList(omProxyInfos.values()); } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java new file mode 100644 index 00000000000..b429ca0044e --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ha; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; + +import java.net.InetSocketAddress; + +/** + * Class to store OM proxy information. + */ +public class OMProxyInfo { + private String nodeId; + private String rpcAddrStr; + private InetSocketAddress rpcAddr; + private Text dtService; + + OMProxyInfo(String nodeID, String rpcAddress) { + this.nodeId = nodeID; + this.rpcAddrStr = rpcAddress; + this.rpcAddr = NetUtils.createSocketAddr(rpcAddrStr); + this.dtService = SecurityUtil.buildTokenService(rpcAddr); + } + + public String toString() { + StringBuilder sb = new StringBuilder() + .append("nodeId=") + .append(nodeId) + .append(",nodeAddress=") + .append(rpcAddrStr); + return sb.toString(); + } + + public InetSocketAddress getAddress() { + return rpcAddr; + } + + public Text getDelegationTokenService() { + return dtService; + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index d24b3da5d2b..868e04a83b9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -76,6 +76,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; +import org.apache.hadoop.ozone.om.ha.OMProxyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -205,8 +206,7 @@ public static void setScmId(String scmId){ public void testOMClientProxyProvider() { OMFailoverProxyProvider omFailoverProxyProvider = store.getClientProxy() .getOMProxyProvider(); - List omProxies = - omFailoverProxyProvider.getOMProxies(); + List omProxies = omFailoverProxyProvider.getOMProxyInfos(); // For a non-HA OM service, there should be only one OM proxy. Assert.assertEquals(1, omProxies.size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index 44dcee48157..05c53b313df 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -52,8 +52,10 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; +import org.apache.hadoop.ozone.om.ha.OMProxyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; +import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.OzoneVolume; @@ -604,8 +606,8 @@ public void testOMProxyProviderInitialization() throws Exception { OzoneClient rpcClient = cluster.getRpcClient(); OMFailoverProxyProvider omFailoverProxyProvider = rpcClient.getObjectStore().getClientProxy().getOMProxyProvider(); - List omProxies = - omFailoverProxyProvider.getOMProxies(); + List omProxies = + omFailoverProxyProvider.getOMProxyInfos(); Assert.assertEquals(numOfOMs, omProxies.size()); @@ -613,7 +615,7 @@ public void testOMProxyProviderInitialization() throws Exception { InetSocketAddress omRpcServerAddr = cluster.getOzoneManager(i).getOmRpcServerAddr(); boolean omClientProxyExists = false; - for (OMFailoverProxyProvider.OMProxyInfo omProxyInfo : omProxies) { + for (OMProxyInfo omProxyInfo : omProxies) { if (omProxyInfo.getAddress().equals(omRpcServerAddr)) { omClientProxyExists = true; break; @@ -674,7 +676,7 @@ public void testOMProxyProviderFailoverToCurrentLeader() throws Exception { // Perform a manual failover of the proxy provider to move the // currentProxyIndex to a node other than the leader OM. omFailoverProxyProvider.performFailover( - omFailoverProxyProvider.getProxy().proxy); + (OzoneManagerProtocolPB) omFailoverProxyProvider.getProxy().proxy); String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);