HDDS-1072. Implement RetryProxy and FailoverProxy for OM client.

This commit is contained in:
Hanisha Koneru 2019-03-01 00:27:39 -08:00
parent 7a0db2f92b
commit 8e1225991d
21 changed files with 653 additions and 342 deletions

View File

@ -379,6 +379,23 @@ public final class OzoneConfigKeys {
public static final String OZONE_FS_ISOLATED_CLASSLOADER =
"ozone.fs.isolated-classloader";
// Ozone Client Retry and Failover configurations
public static final String OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY =
"ozone.client.retry.max.attempts";
public static final int OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT =
10;
public static final String OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY =
"ozone.client.failover.max.attempts";
public static final int OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT =
15;
public static final String OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY =
"ozone.client.failover.sleep.base.millis";
public static final int OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT =
500;
public static final String OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY =
"ozone.client.failover.sleep.max.millis";
public static final int OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT =
15000;
public static final String OZONE_FREON_HTTP_ENABLED_KEY =
"ozone.freon.http.enabled";

View File

@ -274,4 +274,7 @@ public final class OzoneConsts {
// Default OMServiceID for OM Ratis servers to use as RaftGroupId
public static final String OM_SERVICE_ID_DEFAULT = "omServiceIdDefault";
// Dummy OMNodeID for OM Clients to use for a non-HA OM setup
public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";
}

View File

@ -2029,4 +2029,45 @@
</description>
</property>
</configuration>
<property>
<name>ozone.client.retry.max.attempts</name>
<value>10</value>
<description>
Max retry attempts for Ozone RpcClient talking to OzoneManagers.
</description>
</property>
<property>
<name>ozone.client.failover.max.attempts</name>
<value>15</value>
<description>
Expert only. The number of client failover attempts that should be
made before the failover is considered failed.
</description>
</property>
<property>
<name>ozone.client.failover.sleep.base.millis</name>
<value>500</value>
<description>
Expert only. The time to wait, in milliseconds, between failover
attempts increases exponentially as a function of the number of
attempts made so far, with a random factor of +/- 50%. This option
specifies the base value used in the failover calculation. The
first failover will retry immediately. The 2nd failover attempt
will delay at least ozone.client.failover.sleep.base.millis
milliseconds. And so on.
</description>
</property>
<property>
<name>ozone.client.failover.sleep.max.millis</name>
<value>15000</value>
<description>
Expert only. The time to wait, in milliseconds, between failover
attempts increases exponentially as a function of the number of
attempts made so far, with a random factor of +/- 50%. This option
specifies the maximum value to wait between failovers.
Specifically, the time between two failover attempts will not
exceed +/- 50% of ozone.client.failover.sleep.max.millis
milliseconds.
</description>
</property>
</configuration>

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@ -510,5 +510,5 @@ public interface ClientProtocol {
S3SecretValue getS3Secret(String kerberosID) throws IOException;
@VisibleForTesting
OMProxyProvider getOMProxyProvider();
OMFailoverProxyProvider getOMProxyProvider();
}

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.ozone.client.rest.headers.Header;
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@ -725,7 +725,7 @@ public class RestClient implements ClientProtocol {
}
@Override
public OMProxyProvider getOMProxyProvider() {
public OMFailoverProxyProvider getOMProxyProvider() {
return null;
}

View File

@ -50,8 +50,8 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@ -66,6 +66,8 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB
.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneAcl;
@ -85,6 +87,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.io.Text;
import org.apache.logging.log4j.util.Strings;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -107,7 +110,6 @@ public class RpcClient implements ClientProtocol {
private final OzoneConfiguration conf;
private final StorageContainerLocationProtocol
storageContainerLocationClient;
private final OMProxyProvider omProxyProvider;
private final OzoneManagerProtocol ozoneManagerClient;
private final XceiverClientManager xceiverClientManager;
private final int chunkSize;
@ -121,6 +123,7 @@ public class RpcClient implements ClientProtocol {
private final long streamBufferMaxSize;
private final long blockSize;
private final long watchTimeout;
private final ClientId clientId = ClientId.randomId();
/**
* Creates RpcClient instance with the given configuration.
@ -137,11 +140,8 @@ public class RpcClient implements ClientProtocol {
OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
this.omProxyProvider = new OMProxyProvider(conf, ugi);
this.ozoneManagerClient =
TracingUtil.createProxy(
this.omProxyProvider.getProxy(),
OzoneManagerProtocol.class);
this.ozoneManagerClient = new OzoneManagerProtocolClientSideTranslatorPB(
this.conf, clientId.toString(), ugi);
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
@ -492,8 +492,8 @@ public class RpcClient implements ClientProtocol {
@Override
@VisibleForTesting
public OMProxyProvider getOMProxyProvider() {
return omProxyProvider;
public OMFailoverProxyProvider getOMProxyProvider() {
return ozoneManagerClient.getOMFailoverProxyProvider();
}
@Override

View File

@ -1,49 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.ha;
import org.apache.hadoop.ozone.om.protocolPB
.OzoneManagerProtocolClientSideTranslatorPB;
import java.net.InetSocketAddress;
/**
* Proxy information of OM.
*/
public final class OMProxyInfo {
private InetSocketAddress address;
private OzoneManagerProtocolClientSideTranslatorPB omClient;
public OMProxyInfo(InetSocketAddress addr) {
this.address = addr;
}
public InetSocketAddress getAddress() {
return address;
}
public OzoneManagerProtocolClientSideTranslatorPB getOMProxy() {
return omClient;
}
public void setOMProxy(
OzoneManagerProtocolClientSideTranslatorPB clientProxy) {
this.omClient = clientProxy;
}
}

View File

@ -1,177 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.ha;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.protocolPB
.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
/**
* A failover proxy provider implementation which allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
* connecting to another OM node from the list of proxies.
*/
public class OMProxyProvider implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(OMProxyProvider.class);
private List<OMProxyInfo> omProxies;
private int currentProxyIndex = 0;
private final Configuration conf;
private final long omVersion;
private final UserGroupInformation ugi;
private ClientId clientId = ClientId.randomId();
public OMProxyProvider(Configuration configuration,
UserGroupInformation ugi) {
this.conf = configuration;
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
this.ugi = ugi;
loadOMClientConfigs(conf);
}
private void loadOMClientConfigs(Configuration config) {
this.omProxies = new ArrayList<>();
Collection<String> omServiceIds = config.getTrimmedStringCollection(
OZONE_OM_SERVICE_IDS_KEY);
if (omServiceIds.size() > 1) {
throw new IllegalArgumentException("Multi-OM Services is not supported." +
" Please configure only one OM Service ID in " +
OZONE_OM_SERVICE_IDS_KEY);
}
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
serviceId, nodeId);
String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
if (rpcAddrStr == null) {
continue;
}
InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
// Add the OM client proxy info to list of proxies
if (addr != null) {
OMProxyInfo omProxyInfo = new OMProxyInfo(addr);
omProxies.add(omProxyInfo);
} else {
LOG.error("Failed to create OM proxy at address {}", rpcAddrStr);
}
}
}
if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
}
private OzoneManagerProtocolClientSideTranslatorPB getOMClient(
InetSocketAddress omAddress) throws IOException {
return new OzoneManagerProtocolClientSideTranslatorPB(
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)), clientId.toString());
}
/**
* Get the proxy object which should be used until the next failover event
* occurs. RPC proxy object is intialized lazily.
* @return the OM proxy object to invoke methods upon
*/
public synchronized OzoneManagerProtocolClientSideTranslatorPB getProxy() {
OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyIndex);
return createOMClientIfNeeded(currentOMProxyInfo);
}
private OzoneManagerProtocolClientSideTranslatorPB createOMClientIfNeeded(
OMProxyInfo proxyInfo) {
if (proxyInfo.getOMProxy() == null) {
try {
proxyInfo.setOMProxy(getOMClient(proxyInfo.getAddress()));
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to OM at {}",
this.getClass().getSimpleName(), proxyInfo.getAddress(), ioe);
throw new RuntimeException(ioe);
}
}
return proxyInfo.getOMProxy();
}
/**
* Called whenever an error warrants failing over. It is determined by the
* retry policy.
*/
public void performFailover() {
incrementProxyIndex();
}
synchronized void incrementProxyIndex() {
currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* the proxy provider.
*/
@Override
public synchronized void close() throws IOException {
for (OMProxyInfo proxy : omProxies) {
OzoneManagerProtocolClientSideTranslatorPB omProxy = proxy.getOMProxy();
if (omProxy != null) {
RPC.stopProxy(omProxy);
}
}
}
@VisibleForTesting
public List<OMProxyInfo> getOMProxies() {
return omProxies;
}
}

View File

@ -0,0 +1,266 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.ha;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
/**
* A failover proxy provider implementation which allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
* connecting to another OM node from the list of proxies.
*/
public class OMFailoverProxyProvider implements
FailoverProxyProvider<OzoneManagerProtocolPB>, Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(OMFailoverProxyProvider.class);
// Map of OMNodeID to its proxy
private Map<String, OMProxyInfo> omProxies;
private List<String> omNodeIDList;
private String currentProxyOMNodeId;
private int currentProxyIndex;
private final Configuration conf;
private final long omVersion;
private final UserGroupInformation ugi;
public OMFailoverProxyProvider(OzoneConfiguration configuration,
UserGroupInformation ugi) throws IOException {
this.conf = configuration;
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
this.ugi = ugi;
loadOMClientConfigs(conf);
currentProxyIndex = 0;
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
}
/**
* Class to store proxy information.
*/
public final class OMProxyInfo
extends FailoverProxyProvider.ProxyInfo<OzoneManagerProtocolPB> {
private InetSocketAddress address;
OMProxyInfo(OzoneManagerProtocolPB proxy, String proxyInfoStr,
InetSocketAddress addr) {
super(proxy, proxyInfoStr);
this.address = addr;
}
public InetSocketAddress getAddress() {
return address;
}
}
private void loadOMClientConfigs(Configuration config) throws IOException {
this.omProxies = new HashMap<>();
this.omNodeIDList = new ArrayList<>();
Collection<String> omServiceIds = config.getTrimmedStringCollection(
OZONE_OM_SERVICE_IDS_KEY);
if (omServiceIds.size() > 1) {
throw new IllegalArgumentException("Multi-OM Services is not supported." +
" Please configure only one OM Service ID in " +
OZONE_OM_SERVICE_IDS_KEY);
}
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
serviceId, nodeId);
String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
if (rpcAddrStr == null) {
continue;
}
InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
// Add the OM client proxy info to list of proxies
if (addr != null) {
StringBuilder proxyInfo = new StringBuilder()
.append(nodeId).append("(")
.append(NetUtils.getHostPortString(addr)).append(")");
OMProxyInfo omProxyInfo = new OMProxyInfo(null,
proxyInfo.toString(), addr);
// For a non-HA OM setup, nodeId might be null. If so, we assign it
// a dummy value
if (nodeId == null) {
nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
}
omProxies.put(nodeId, omProxyInfo);
omNodeIDList.add(nodeId);
} else {
LOG.error("Failed to create OM proxy for {} at address {}",
nodeId, rpcAddrStr);
}
}
}
if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
}
@VisibleForTesting
public synchronized String getCurrentProxyOMNodeId() {
return currentProxyOMNodeId;
}
private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress)
throws IOException {
return RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf));
}
/**
* Get the proxy object which should be used until the next failover event
* occurs. RPC proxy object is intialized lazily.
* @return the OM proxy object to invoke methods upon
*/
@Override
public synchronized OMProxyInfo getProxy() {
OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyOMNodeId);
createOMProxyIfNeeded(currentOMProxyInfo);
return currentOMProxyInfo;
}
/**
* Creates OM proxy object if it does not already exist.
*/
private OMProxyInfo createOMProxyIfNeeded(OMProxyInfo proxyInfo) {
if (proxyInfo.proxy == null) {
try {
proxyInfo.proxy = createOMProxy(proxyInfo.address);
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to OM at {}",
this.getClass().getSimpleName(), proxyInfo.address, ioe);
throw new RuntimeException(ioe);
}
}
return proxyInfo;
}
/**
* Called whenever an error warrants failing over. It is determined by the
* retry policy.
*/
@Override
public void performFailover(OzoneManagerProtocolPB currentProxy) {
int newProxyIndex = incrementProxyIndex();
LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
newProxyIndex, omNodeIDList.get(newProxyIndex));
}
/**
* Update the proxy index to the next proxy in the list.
* @return the new proxy index
*/
private synchronized int incrementProxyIndex() {
currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
return currentProxyIndex;
}
@Override
public Class<OzoneManagerProtocolPB> getInterface() {
return OzoneManagerProtocolPB.class;
}
/**
* Performs failover if the leaderOMNodeId returned through OMReponse does
* not match the current leaderOMNodeId cached by the proxy provider.
*/
public void performFailoverIfRequired(String newLeaderOMNodeId) {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
}
}
/**
* Failover to the OM proxy specified by the new leader OMNodeId.
* @param newLeaderOMNodeId OMNodeId to failover to.
* @return true if failover is successful, false otherwise.
*/
synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
if (omProxies.containsKey(newLeaderOMNodeId)) {
currentProxyOMNodeId = newLeaderOMNodeId;
currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
return true;
}
}
return false;
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* the proxy provider.
*/
@Override
public synchronized void close() throws IOException {
for (OMProxyInfo proxy : omProxies.values()) {
OzoneManagerProtocolPB omProxy = proxy.proxy;
if (omProxy != null) {
RPC.stopProxy(omProxy);
}
}
}
@VisibleForTesting
public List<OMProxyInfo> getOMProxies() {
return new ArrayList<>(omProxies.values());
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.ha;
package org.apache.hadoop.ozone.om.ha;
/**
* This package contains Ozone Client's OM Proxy classes.

View File

@ -16,6 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.protocol;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
@ -380,5 +381,11 @@ public interface OzoneManagerProtocol
* @throws IOException
*/
S3SecretValue getS3Secret(String kerberosID) throws IOException;
/**
* Get the OM Client's Retry and Failover Proxy provider.
* @return OMFailoverProxyProvider
*/
OMFailoverProxyProvider getOMFailoverProxyProvider();
}

View File

@ -17,17 +17,25 @@
*/
package org.apache.hadoop.ozone.om.protocolPB;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@ -102,6 +110,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
@ -111,6 +120,9 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
@ -131,19 +143,88 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
private final OMFailoverProxyProvider omFailoverProxyProvider;
private final OzoneManagerProtocolPB rpcProxy;
private final String clientID;
private static final Logger FAILOVER_PROXY_PROVIDER_LOG =
LoggerFactory.getLogger(OMFailoverProxyProvider.class);
public OzoneManagerProtocolClientSideTranslatorPB(
OzoneManagerProtocolPB proxy, String clientId) {
this.rpcProxy = proxy;
this.clientID = clientId;
this.omFailoverProxyProvider = null;
}
/**
* Constructor for KeySpaceManger Client.
* @param rpcProxy
* Constructor for OM Protocol Client. This creates a {@link RetryProxy}
* over {@link OMFailoverProxyProvider} proxy. OMFailoverProxyProvider has
* one {@link OzoneManagerProtocolPB} proxy pointing to each OM node in the
* cluster.
*/
public OzoneManagerProtocolClientSideTranslatorPB(
OzoneManagerProtocolPB rpcProxy, String clientId) {
this.rpcProxy = rpcProxy;
public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf,
String clientId, UserGroupInformation ugi) throws IOException {
this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi);
int maxRetries = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
int sleepBase = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
int sleepMax = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT);
this.rpcProxy = createRetryProxy(omFailoverProxyProvider,
maxRetries, maxFailovers, sleepBase, sleepMax);
this.clientID = clientId;
}
/**
* Creates a {@link RetryProxy} encapsulating the
* {@link OMFailoverProxyProvider}. The retry proxy fails over on network
* exception or if the current proxy is not the leader OM.
*/
private OzoneManagerProtocolPB createRetryProxy(
OMFailoverProxyProvider failoverProxyProvider,
int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {
RetryPolicy retryPolicyOnNetworkException = RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
maxFailovers, maxRetries, delayMillis, maxDelayBase);
RetryPolicy retryPolicy = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception exception, int retries,
int failovers, boolean isIdempotentOrAtMostOnce)
throws Exception {
if (exception instanceof EOFException ||
exception instanceof ServiceException) {
if (retries < maxRetries && failovers < maxFailovers) {
return RetryAction.FAILOVER_AND_RETRY;
} else {
FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
"Attempted {} retries and {} failovers", retries, failovers);
return RetryAction.FAIL;
}
} else {
return retryPolicyOnNetworkException.shouldRetry(
exception, retries, failovers, isIdempotentOrAtMostOnce);
}
}
};
OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
return proxy;
}
@VisibleForTesting
public OMFailoverProxyProvider getOMFailoverProxyProvider() {
return omFailoverProxyProvider;
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
@ -195,7 +276,19 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
OMRequest payload = OMRequest.newBuilder(omRequest)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
OMResponse omResponse =
rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
String leaderOmId = omResponse.getLeaderOMNodeId();
// Failover to the OM node returned by OMReponse leaderOMNodeId if
// current proxy is not pointing to that node.
omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
}
return omResponse;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -140,6 +140,8 @@ message OMResponse {
required Status status = 5;
optional string leaderOMNodeId = 6;
optional CreateVolumeResponse createVolumeResponse = 11;
optional SetVolumePropertyResponse setVolumePropertyResponse = 12;
optional CheckVolumeAccessResponse checkVolumeAccessResponse = 13;

View File

@ -32,7 +32,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ -48,6 +50,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
private Map<String, OzoneManager> ozoneManagerMap;
private List<OzoneManager> ozoneManagers;
private static final Random RANDOM = new Random();
@ -63,11 +66,12 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private MiniOzoneHAClusterImpl(
OzoneConfiguration conf,
List<OzoneManager> omList,
Map<String, OzoneManager> omMap,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes) {
super(conf, scm, hddsDatanodes);
this.ozoneManagers = omList;
this.ozoneManagerMap = omMap;
this.ozoneManagers = new ArrayList<>(omMap.values());
}
/**
@ -107,6 +111,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
ozoneManagers.get(index).stop();
}
public void stopOzoneManager(String omNodeId) {
ozoneManagerMap.get(omNodeId).stop();
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
@ -128,17 +136,17 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
DefaultMetricsSystem.setMiniClusterMode(true);
initializeConfiguration();
StorageContainerManager scm;
List<OzoneManager> omList;
Map<String, OzoneManager> omMap;
try {
scm = createSCM();
scm.start();
omList = createOMService();
omMap = createOMService();
} catch (AuthenticationException ex) {
throw new IOException("Unable to build MiniOzoneCluster. ", ex);
}
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList,
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
scm, hddsDatanodes);
if (startDataNodes) {
cluster.startHddsDatanodes();
@ -171,10 +179,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
* @throws IOException
* @throws AuthenticationException
*/
private List<OzoneManager> createOMService() throws IOException,
private Map<String, OzoneManager> createOMService() throws IOException,
AuthenticationException {
List<OzoneManager> omList = new ArrayList<>(numOfOMs);
Map<String, OzoneManager> omMap = new HashMap<>();
int retryCount = 0;
int basePort = 10000;
@ -186,10 +194,11 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
for (int i = 1; i<= numOfOMs; i++) {
// Set nodeId
conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i);
String nodeId = nodeIdBaseStr + i;
conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
// Set metadata/DB dir base path
String metaDirPath = path + "/" + nodeIdBaseStr + i;
String metaDirPath = path + "/" + nodeId;
conf.set(OZONE_METADATA_DIRS, metaDirPath);
OMStorage omStore = new OMStorage(conf);
initializeOmStorage(omStore);
@ -201,7 +210,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
OzoneManager om = OzoneManager.createOm(null, conf);
om.setCertClient(certClient);
omList.add(om);
omMap.put(nodeId, om);
om.start();
LOG.info("Started OzoneManager RPC server at " +
@ -211,23 +220,24 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
// Set default OM address to point to the first OM. Clients would
// try connecting to this address by default
conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
NetUtils.getHostPortString(omMap.get(nodeIdBaseStr + 1)
.getOmRpcServerAddr()));
break;
} catch (BindException e) {
for (OzoneManager om : omList) {
for (OzoneManager om : omMap.values()) {
om.stop();
om.join();
LOG.info("Stopping OzoneManager server at " +
om.getOmRpcServerAddr());
}
omList.clear();
omMap.clear();
++retryCount;
LOG.info("MiniOzoneHACluster port conflicts, retried " +
retryCount + " times");
}
}
return omList;
return omMap;
}
/**

View File

@ -65,8 +65,6 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@ -76,6 +74,7 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocat
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@ -189,9 +188,10 @@ public abstract class TestOzoneRpcClientAbstract {
*/
@Test
public void testOMClientProxyProvider() {
OMProxyProvider omProxyProvider = store.getClientProxy()
OMFailoverProxyProvider omFailoverProxyProvider = store.getClientProxy()
.getOMProxyProvider();
List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
omFailoverProxyProvider.getOMProxies();
// For a non-HA OM service, there should be only one OM proxy.
Assert.assertEquals(1, omProxies.size());

View File

@ -18,30 +18,29 @@ package org.apache.hadoop.ozone.om;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
@ -49,6 +48,14 @@ import java.util.UUID;
import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
.NODE_FAILURE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
@ -58,8 +65,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
public class TestOzoneManagerHA {
private MiniOzoneHAClusterImpl cluster = null;
private StorageHandler storageHandler;
private UserArgs userArgs;
private ObjectStore objectStore;
private OzoneConfiguration conf;
private String clusterId;
private String scmId;
@ -69,7 +75,7 @@ public class TestOzoneManagerHA {
public ExpectedException exception = ExpectedException.none();
@Rule
public Timeout timeout = new Timeout(60_000);
public Timeout timeout = new Timeout(120_000);
/**
* Create a MiniDFSCluster for testing.
@ -85,6 +91,9 @@ public class TestOzoneManagerHA {
scmId = UUID.randomUUID().toString();
conf.setBoolean(OZONE_ACL_ENABLED, true);
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
@ -93,9 +102,7 @@ public class TestOzoneManagerHA {
.setNumOfOzoneManagers(numOfOMs)
.build();
cluster.waitForClusterToBeReady();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
null, null, null, null);
objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
}
/**
@ -115,7 +122,7 @@ public class TestOzoneManagerHA {
*/
@Test
public void testAllOMNodesRunning() throws Exception {
testCreateVolume(true);
createVolumeTest(true);
}
/**
@ -126,52 +133,56 @@ public class TestOzoneManagerHA {
cluster.stopOzoneManager(1);
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
testCreateVolume(true);
createVolumeTest(true);
}
/**
* Test client request fails when 2 OMs are down.
*/
@Test
@Ignore("TODO:HDDS-1158")
public void testTwoOMNodesDown() throws Exception {
cluster.stopOzoneManager(1);
cluster.stopOzoneManager(2);
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
testCreateVolume(false);
createVolumeTest(false);
}
/**
* Create a volume and test its attribute.
*/
private void testCreateVolume(boolean checkSuccess) throws Exception {
private void createVolumeTest(boolean checkSuccess) throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();
try {
storageHandler.createVolume(createVolumeArgs);
objectStore.createVolume(volumeName, createVolumeArgs);
VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
if (checkSuccess) {
Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
} else {
// Verify that the request failed
Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty());
Assert.fail("There is no quorum. Request should have failed");
}
} catch (OMException e) {
} catch (ConnectException | RemoteException e) {
if (!checkSuccess) {
GenericTestUtils.assertExceptionContains(
"RaftRetryFailureException", e);
// If the last OM to be tried by the RetryProxy is down, we would get
// ConnectException. Otherwise, we would get a RemoteException from the
// last running OM as it would fail to get a quorum.
if (e instanceof RemoteException) {
GenericTestUtils.assertExceptionContains(
"RaftRetryFailureException", e);
}
} else {
throw e;
}
@ -179,14 +190,16 @@ public class TestOzoneManagerHA {
}
/**
* Test that OMProxyProvider creates an OM proxy for each OM in the cluster.
* Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
* cluster.
*/
@Test
public void testOMClientProxyProvide() throws Exception {
public void testOMProxyProviderInitialization() throws Exception {
OzoneClient rpcClient = cluster.getRpcClient();
OMProxyProvider omProxyProvider =
OMFailoverProxyProvider omFailoverProxyProvider =
rpcClient.getObjectStore().getClientProxy().getOMProxyProvider();
List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
omFailoverProxyProvider.getOMProxies();
Assert.assertEquals(numOfOMs, omProxies.size());
@ -194,7 +207,7 @@ public class TestOzoneManagerHA {
InetSocketAddress omRpcServerAddr =
cluster.getOzoneManager(i).getOmRpcServerAddr();
boolean omClientProxyExists = false;
for (OMProxyInfo omProxyInfo : omProxies) {
for (OMFailoverProxyProvider.OMProxyInfo omProxyInfo : omProxies) {
if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
omClientProxyExists = true;
break;
@ -205,4 +218,99 @@ public class TestOzoneManagerHA {
omClientProxyExists);
}
}
/**
* Test OMFailoverProxyProvider failover on connection exception to OM client.
*/
@Test
public void testOMProxyProviderFailoverOnConnectionFailure()
throws Exception {
OMFailoverProxyProvider omFailoverProxyProvider =
objectStore.getClientProxy().getOMProxyProvider();
String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
createVolumeTest(true);
// On stopping the current OM Proxy, the next connection attempt should
// failover to a another OM proxy.
cluster.stopOzoneManager(firstProxyNodeId);
Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT * 4);
// Next request to the proxy provider should result in a failover
createVolumeTest(true);
Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
// Get the new OM Proxy NodeId
String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
// Verify that a failover occured. the new proxy nodeId should be
// different from the old proxy nodeId.
Assert.assertNotEquals("Failover did not occur as expected",
firstProxyNodeId, newProxyNodeId);
}
/**
* Test OMFailoverProxyProvider failover when current OM proxy is not
* the current OM Leader.
*/
@Test
public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
OMFailoverProxyProvider omFailoverProxyProvider =
objectStore.getClientProxy().getOMProxyProvider();
// Run couple of createVolume tests to discover the current Leader OM
createVolumeTest(true);
createVolumeTest(true);
// The OMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
// Perform a manual failover of the proxy provider to move the
// currentProxyIndex to a node other than the leader OM.
omFailoverProxyProvider.performFailover(
omFailoverProxyProvider.getProxy().proxy);
String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);
// Once another request is sent to this new proxy node, the leader
// information must be returned via the response and a failover must
// happen to the leader proxy node.
createVolumeTest(true);
Thread.sleep(2000);
String newLeaderOMNodeId =
omFailoverProxyProvider.getCurrentProxyOMNodeId();
// The old and new Leader OM NodeId must match since there was no new
// election in the Ratis ring.
Assert.assertEquals(leaderOMNodeId, newLeaderOMNodeId);
}
@Test
public void testOMRetryProxy() throws Exception {
// Stop all the OMs. After making 5 (set maxRetries value) attempts at
// connection, the RpcClient should give up.
for (int i = 0; i < numOfOMs; i++) {
cluster.stopOzoneManager(i);
}
final LogVerificationAppender appender = new LogVerificationAppender();
final org.apache.log4j.Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
try {
createVolumeTest(true);
Assert.fail("TestOMRetryProxy should fail when there are no OMs running");
} catch (ConnectException e) {
// Each retry attempt tries upto 10 times to connect. So there should be
// 3*10 "Retrying connect to server" messages
Assert.assertEquals(30,
appender.countLinesWithMessage("Retrying connect to server:"));
Assert.assertEquals(1,
appender.countLinesWithMessage("Failed to connect to OM. Attempted " +
"3 retries and 3 failovers"));
}
}
}

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@ -2461,4 +2462,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
public List<OMNodeDetails> getPeerNodes() {
return peerNodes;
}
@Override
public OMFailoverProxyProvider getOMFailoverProxyProvider() {
return null;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
@ -100,10 +101,12 @@ public final class OMRatisHelper {
return Message.valueOf(ByteString.copyFrom(requestBytes));
}
static OMResponse convertByteStringToOMResponse(ByteString byteString)
static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
throws InvalidProtocolBufferException {
byte[] bytes = byteString.toByteArray();
return OMResponse.parseFrom(bytes);
byte[] bytes = reply.getMessage().getContent().toByteArray();
return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
.setLeaderOMNodeId(reply.getReplierId())
.build();
}
static OMResponse getErrorResponse(Type cmdType, Exception e) {

View File

@ -23,7 +23,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
@ -53,24 +56,24 @@ public final class OzoneManagerRatisClient implements Closeable {
OzoneManagerRatisClient.class);
private final RaftGroup raftGroup;
private final String omID;
private final String omNodeID;
private final RpcType rpcType;
private RaftClient raftClient;
private final RetryPolicy retryPolicy;
private final Configuration conf;
private OzoneManagerRatisClient(String omId, RaftGroup raftGroup,
private OzoneManagerRatisClient(String omNodeId, RaftGroup raftGroup,
RpcType rpcType, RetryPolicy retryPolicy,
Configuration config) {
this.raftGroup = raftGroup;
this.omID = omId;
this.omNodeID = omNodeId;
this.rpcType = rpcType;
this.retryPolicy = retryPolicy;
this.conf = config;
}
public static OzoneManagerRatisClient newOzoneManagerRatisClient(
String omId, RaftGroup raftGroup, Configuration conf) {
String omNodeId, RaftGroup raftGroup, Configuration conf) {
final String rpcType = conf.get(
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
@ -87,19 +90,19 @@ public final class OzoneManagerRatisClient implements Closeable {
final RetryPolicy retryPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
return new OzoneManagerRatisClient(omId, raftGroup,
return new OzoneManagerRatisClient(omNodeId, raftGroup,
SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf);
}
public void connect() {
LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}",
raftGroup.getGroupId().getUuid().toString(), omID);
raftGroup.getGroupId().getUuid().toString(), omNodeID);
// TODO : XceiverClient ratis should pass the config value of
// maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client
raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup,
raftClient = OMRatisHelper.newRaftClient(rpcType, omNodeID, raftGroup,
retryPolicy, conf);
}
@ -119,13 +122,12 @@ public final class OzoneManagerRatisClient implements Closeable {
* @param request Request
* @return Response to the command
*/
public OMResponse sendCommand(OMRequest request) {
public OMResponse sendCommand(OMRequest request) throws ServiceException {
try {
CompletableFuture<OMResponse> reply = sendCommandAsync(request);
return reply.get();
} catch (ExecutionException | InterruptedException e) {
LOG.error("Failed to execute command: " + request, e);
return OMRatisHelper.getErrorResponse(request.getCmdType(), e);
throw new ServiceException(e);
}
}
@ -152,9 +154,10 @@ public final class OzoneManagerRatisClient implements Closeable {
if (raftRetryFailureException != null) {
throw new CompletionException(raftRetryFailureException);
}
OMResponse response = OMRatisHelper
.convertByteStringToOMResponse(reply.getMessage()
.getContent());
.getOMResponseFromRaftClientReply(reply);
return response;
} catch (InvalidProtocolBufferException e) {
throw new CompletionException(e);

View File

@ -80,7 +80,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
/**
* Submits request to OM's Ratis server.
*/
private OMResponse submitRequestToRatis(OMRequest request) {
private OMResponse submitRequestToRatis(OMRequest request)
throws ServiceException {
return omRatisClient.sendCommand(request);
}

View File

@ -34,8 +34,6 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMNodeDetails;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.RaftGroupId;
@ -109,27 +107,6 @@ public class TestOzoneManagerRatisServer {
LifeCycle.State.RUNNING, omRatisServer.getServerState());
}
/**
* Submit any request to OM Ratis server and check that the dummy response
* message is received.
*/
@Test
public void testSubmitRatisRequest() throws Exception {
// Wait for leader election
Thread.sleep(LEADER_ELECTION_TIMEOUT * 2);
OMRequest request = OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
.setClientId(clientId)
.build();
OMResponse response = omRatisClient.sendCommand(request);
Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateVolume,
response.getCmdType());
Assert.assertEquals(false, response.getSuccess());
Assert.assertEquals(false, response.hasCreateVolumeResponse());
}
/**
* Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
* categorized in {@link OmUtils#isReadOnly(OMRequest)}.