HDDS-1072. Implement RetryProxy and FailoverProxy for OM client.

This commit is contained in:
Hanisha Koneru 2019-03-05 09:24:22 -08:00
parent 7b42e0e32a
commit 12402b7a74
21 changed files with 660 additions and 345 deletions

View File

@ -379,6 +379,23 @@ public final class OzoneConfigKeys {
public static final String OZONE_FS_ISOLATED_CLASSLOADER = public static final String OZONE_FS_ISOLATED_CLASSLOADER =
"ozone.fs.isolated-classloader"; "ozone.fs.isolated-classloader";
// Ozone Client Retry and Failover configurations
public static final String OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY =
"ozone.client.retry.max.attempts";
public static final int OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT =
10;
public static final String OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY =
"ozone.client.failover.max.attempts";
public static final int OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT =
15;
public static final String OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY =
"ozone.client.failover.sleep.base.millis";
public static final int OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT =
500;
public static final String OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY =
"ozone.client.failover.sleep.max.millis";
public static final int OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT =
15000;
public static final String OZONE_FREON_HTTP_ENABLED_KEY = public static final String OZONE_FREON_HTTP_ENABLED_KEY =
"ozone.freon.http.enabled"; "ozone.freon.http.enabled";

View File

@ -276,4 +276,7 @@ public final class OzoneConsts {
// Default OMServiceID for OM Ratis servers to use as RaftGroupId // Default OMServiceID for OM Ratis servers to use as RaftGroupId
public static final String OM_SERVICE_ID_DEFAULT = "omServiceIdDefault"; public static final String OM_SERVICE_ID_DEFAULT = "omServiceIdDefault";
// Dummy OMNodeID for OM Clients to use for a non-HA OM setup
public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";
} }

View File

@ -2029,4 +2029,45 @@
</description> </description>
</property> </property>
<property>
<name>ozone.client.retry.max.attempts</name>
<value>10</value>
<description>
Max retry attempts for Ozone RpcClient talking to OzoneManagers.
</description>
</property>
<property>
<name>ozone.client.failover.max.attempts</name>
<value>15</value>
<description>
Expert only. The number of client failover attempts that should be
made before the failover is considered failed.
</description>
</property>
<property>
<name>ozone.client.failover.sleep.base.millis</name>
<value>500</value>
<description>
Expert only. The time to wait, in milliseconds, between failover
attempts increases exponentially as a function of the number of
attempts made so far, with a random factor of +/- 50%. This option
specifies the base value used in the failover calculation. The
first failover will retry immediately. The 2nd failover attempt
will delay at least ozone.client.failover.sleep.base.millis
milliseconds. And so on.
</description>
</property>
<property>
<name>ozone.client.failover.sleep.max.millis</name>
<value>15000</value>
<description>
Expert only. The time to wait, in milliseconds, between failover
attempts increases exponentially as a function of the number of
attempts made so far, with a random factor of +/- 50%. This option
specifies the maximum value to wait between failovers.
Specifically, the time between two failover attempts will not
exceed +/- 50% of ozone.client.failover.sleep.max.millis
milliseconds.
</description>
</property>
</configuration> </configuration>

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@ -510,5 +510,5 @@ public interface ClientProtocol {
S3SecretValue getS3Secret(String kerberosID) throws IOException; S3SecretValue getS3Secret(String kerberosID) throws IOException;
@VisibleForTesting @VisibleForTesting
OMProxyProvider getOMProxyProvider(); OMFailoverProxyProvider getOMProxyProvider();
} }

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.ozone.client.rest.headers.Header;
import org.apache.hadoop.ozone.client.rest.response.BucketInfo; import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails; import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo; import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@ -725,7 +725,7 @@ public class RestClient implements ClientProtocol {
} }
@Override @Override
public OMProxyProvider getOMProxyProvider() { public OMFailoverProxyProvider getOMProxyProvider() {
return null; return null;
} }

View File

@ -50,8 +50,8 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo; import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@ -66,7 +66,8 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.protocolPB
.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMConfigKeys;
@ -85,6 +86,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.logging.log4j.util.Strings; import org.apache.logging.log4j.util.Strings;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -107,7 +109,6 @@ public class RpcClient implements ClientProtocol {
private final OzoneConfiguration conf; private final OzoneConfiguration conf;
private final StorageContainerLocationProtocol private final StorageContainerLocationProtocol
storageContainerLocationClient; storageContainerLocationClient;
private final OMProxyProvider omProxyProvider;
private final OzoneManagerProtocol ozoneManagerClient; private final OzoneManagerProtocol ozoneManagerClient;
private final XceiverClientManager xceiverClientManager; private final XceiverClientManager xceiverClientManager;
private final int chunkSize; private final int chunkSize;
@ -121,6 +122,7 @@ public class RpcClient implements ClientProtocol {
private final long streamBufferMaxSize; private final long streamBufferMaxSize;
private final long blockSize; private final long blockSize;
private final long watchTimeout; private final long watchTimeout;
private final ClientId clientId = ClientId.randomId();
/** /**
* Creates RpcClient instance with the given configuration. * Creates RpcClient instance with the given configuration.
@ -135,13 +137,8 @@ public class RpcClient implements ClientProtocol {
OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT); OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
this.groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS, this.groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS,
OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT); OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class, this.ozoneManagerClient = new OzoneManagerProtocolClientSideTranslatorPB(
ProtobufRpcEngine.class); this.conf, clientId.toString(), ugi);
this.omProxyProvider = new OMProxyProvider(conf, ugi);
this.ozoneManagerClient =
TracingUtil.createProxy(
this.omProxyProvider.getProxy(),
OzoneManagerProtocol.class);
long scmVersion = long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
@ -492,8 +489,8 @@ public class RpcClient implements ClientProtocol {
@Override @Override
@VisibleForTesting @VisibleForTesting
public OMProxyProvider getOMProxyProvider() { public OMFailoverProxyProvider getOMProxyProvider() {
return omProxyProvider; return ozoneManagerClient.getOMFailoverProxyProvider();
} }
@Override @Override

View File

@ -1,49 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.ha;
import org.apache.hadoop.ozone.om.protocolPB
.OzoneManagerProtocolClientSideTranslatorPB;
import java.net.InetSocketAddress;
/**
* Proxy information of OM.
*/
public final class OMProxyInfo {
private InetSocketAddress address;
private OzoneManagerProtocolClientSideTranslatorPB omClient;
public OMProxyInfo(InetSocketAddress addr) {
this.address = addr;
}
public InetSocketAddress getAddress() {
return address;
}
public OzoneManagerProtocolClientSideTranslatorPB getOMProxy() {
return omClient;
}
public void setOMProxy(
OzoneManagerProtocolClientSideTranslatorPB clientProxy) {
this.omClient = clientProxy;
}
}

View File

@ -1,177 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.ha;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.protocolPB
.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
/**
* A failover proxy provider implementation which allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
* connecting to another OM node from the list of proxies.
*/
public class OMProxyProvider implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(OMProxyProvider.class);
private List<OMProxyInfo> omProxies;
private int currentProxyIndex = 0;
private final Configuration conf;
private final long omVersion;
private final UserGroupInformation ugi;
private ClientId clientId = ClientId.randomId();
public OMProxyProvider(Configuration configuration,
UserGroupInformation ugi) {
this.conf = configuration;
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
this.ugi = ugi;
loadOMClientConfigs(conf);
}
private void loadOMClientConfigs(Configuration config) {
this.omProxies = new ArrayList<>();
Collection<String> omServiceIds = config.getTrimmedStringCollection(
OZONE_OM_SERVICE_IDS_KEY);
if (omServiceIds.size() > 1) {
throw new IllegalArgumentException("Multi-OM Services is not supported." +
" Please configure only one OM Service ID in " +
OZONE_OM_SERVICE_IDS_KEY);
}
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
serviceId, nodeId);
String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
if (rpcAddrStr == null) {
continue;
}
InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
// Add the OM client proxy info to list of proxies
if (addr != null) {
OMProxyInfo omProxyInfo = new OMProxyInfo(addr);
omProxies.add(omProxyInfo);
} else {
LOG.error("Failed to create OM proxy at address {}", rpcAddrStr);
}
}
}
if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
}
private OzoneManagerProtocolClientSideTranslatorPB getOMClient(
InetSocketAddress omAddress) throws IOException {
return new OzoneManagerProtocolClientSideTranslatorPB(
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)), clientId.toString());
}
/**
* Get the proxy object which should be used until the next failover event
* occurs. RPC proxy object is intialized lazily.
* @return the OM proxy object to invoke methods upon
*/
public synchronized OzoneManagerProtocolClientSideTranslatorPB getProxy() {
OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyIndex);
return createOMClientIfNeeded(currentOMProxyInfo);
}
private OzoneManagerProtocolClientSideTranslatorPB createOMClientIfNeeded(
OMProxyInfo proxyInfo) {
if (proxyInfo.getOMProxy() == null) {
try {
proxyInfo.setOMProxy(getOMClient(proxyInfo.getAddress()));
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to OM at {}",
this.getClass().getSimpleName(), proxyInfo.getAddress(), ioe);
throw new RuntimeException(ioe);
}
}
return proxyInfo.getOMProxy();
}
/**
* Called whenever an error warrants failing over. It is determined by the
* retry policy.
*/
public void performFailover() {
incrementProxyIndex();
}
synchronized void incrementProxyIndex() {
currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* the proxy provider.
*/
@Override
public synchronized void close() throws IOException {
for (OMProxyInfo proxy : omProxies) {
OzoneManagerProtocolClientSideTranslatorPB omProxy = proxy.getOMProxy();
if (omProxy != null) {
RPC.stopProxy(omProxy);
}
}
}
@VisibleForTesting
public List<OMProxyInfo> getOMProxies() {
return omProxies;
}
}

View File

@ -0,0 +1,269 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.ha;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
/**
* A failover proxy provider implementation which allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
* connecting to another OM node from the list of proxies.
*/
public class OMFailoverProxyProvider implements
FailoverProxyProvider<OzoneManagerProtocolPB>, Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(OMFailoverProxyProvider.class);
// Map of OMNodeID to its proxy
private Map<String, OMProxyInfo> omProxies;
private List<String> omNodeIDList;
private String currentProxyOMNodeId;
private int currentProxyIndex;
private final Configuration conf;
private final long omVersion;
private final UserGroupInformation ugi;
public OMFailoverProxyProvider(OzoneConfiguration configuration,
UserGroupInformation ugi) throws IOException {
this.conf = configuration;
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
this.ugi = ugi;
loadOMClientConfigs(conf);
currentProxyIndex = 0;
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
}
/**
* Class to store proxy information.
*/
public final class OMProxyInfo
extends FailoverProxyProvider.ProxyInfo<OzoneManagerProtocolPB> {
private InetSocketAddress address;
OMProxyInfo(OzoneManagerProtocolPB proxy, String proxyInfoStr,
InetSocketAddress addr) {
super(proxy, proxyInfoStr);
this.address = addr;
}
public InetSocketAddress getAddress() {
return address;
}
}
private void loadOMClientConfigs(Configuration config) throws IOException {
this.omProxies = new HashMap<>();
this.omNodeIDList = new ArrayList<>();
Collection<String> omServiceIds = config.getTrimmedStringCollection(
OZONE_OM_SERVICE_IDS_KEY);
if (omServiceIds.size() > 1) {
throw new IllegalArgumentException("Multi-OM Services is not supported." +
" Please configure only one OM Service ID in " +
OZONE_OM_SERVICE_IDS_KEY);
}
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
serviceId, nodeId);
String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
if (rpcAddrStr == null) {
continue;
}
InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
// Add the OM client proxy info to list of proxies
if (addr != null) {
StringBuilder proxyInfo = new StringBuilder()
.append(nodeId).append("(")
.append(NetUtils.getHostPortString(addr)).append(")");
OMProxyInfo omProxyInfo = new OMProxyInfo(null,
proxyInfo.toString(), addr);
// For a non-HA OM setup, nodeId might be null. If so, we assign it
// a dummy value
if (nodeId == null) {
nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
}
omProxies.put(nodeId, omProxyInfo);
omNodeIDList.add(nodeId);
} else {
LOG.error("Failed to create OM proxy for {} at address {}",
nodeId, rpcAddrStr);
}
}
}
if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
}
@VisibleForTesting
public synchronized String getCurrentProxyOMNodeId() {
return currentProxyOMNodeId;
}
private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress)
throws IOException {
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
return RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf));
}
/**
* Get the proxy object which should be used until the next failover event
* occurs. RPC proxy object is intialized lazily.
* @return the OM proxy object to invoke methods upon
*/
@Override
public synchronized OMProxyInfo getProxy() {
OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyOMNodeId);
createOMProxyIfNeeded(currentOMProxyInfo);
return currentOMProxyInfo;
}
/**
* Creates OM proxy object if it does not already exist.
*/
private OMProxyInfo createOMProxyIfNeeded(OMProxyInfo proxyInfo) {
if (proxyInfo.proxy == null) {
try {
proxyInfo.proxy = createOMProxy(proxyInfo.address);
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to OM at {}",
this.getClass().getSimpleName(), proxyInfo.address, ioe);
throw new RuntimeException(ioe);
}
}
return proxyInfo;
}
/**
* Called whenever an error warrants failing over. It is determined by the
* retry policy.
*/
@Override
public void performFailover(OzoneManagerProtocolPB currentProxy) {
int newProxyIndex = incrementProxyIndex();
LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
newProxyIndex, omNodeIDList.get(newProxyIndex));
}
/**
* Update the proxy index to the next proxy in the list.
* @return the new proxy index
*/
private synchronized int incrementProxyIndex() {
currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
return currentProxyIndex;
}
@Override
public Class<OzoneManagerProtocolPB> getInterface() {
return OzoneManagerProtocolPB.class;
}
/**
* Performs failover if the leaderOMNodeId returned through OMReponse does
* not match the current leaderOMNodeId cached by the proxy provider.
*/
public void performFailoverIfRequired(String newLeaderOMNodeId) {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
}
}
/**
* Failover to the OM proxy specified by the new leader OMNodeId.
* @param newLeaderOMNodeId OMNodeId to failover to.
* @return true if failover is successful, false otherwise.
*/
synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
if (omProxies.containsKey(newLeaderOMNodeId)) {
currentProxyOMNodeId = newLeaderOMNodeId;
currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
return true;
}
}
return false;
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* the proxy provider.
*/
@Override
public synchronized void close() throws IOException {
for (OMProxyInfo proxy : omProxies.values()) {
OzoneManagerProtocolPB omProxy = proxy.proxy;
if (omProxy != null) {
RPC.stopProxy(omProxy);
}
}
}
@VisibleForTesting
public List<OMProxyInfo> getOMProxies() {
return new ArrayList<>(omProxies.values());
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.ozone.client.rpc.ha; package org.apache.hadoop.ozone.om.ha;
/** /**
* This package contains Ozone Client's OM Proxy classes. * This package contains Ozone Client's OM Proxy classes.

View File

@ -15,8 +15,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.ozone.om.protocol; package org.apache.hadoop.ozone.om.protocol;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
@ -384,5 +387,11 @@ public interface OzoneManagerProtocol
* @throws IOException * @throws IOException
*/ */
S3SecretValue getS3Secret(String kerberosID) throws IOException; S3SecretValue getS3Secret(String kerberosID) throws IOException;
/**
* Get the OM Client's Retry and Failover Proxy provider.
* @return OMFailoverProxyProvider
*/
OMFailoverProxyProvider getOMFailoverProxyProvider();
} }

View File

@ -17,18 +17,26 @@
*/ */
package org.apache.hadoop.ozone.om.protocolPB; package org.apache.hadoop.ozone.om.protocolPB;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@ -103,6 +111,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper; import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
@ -112,6 +121,9 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
@ -132,19 +144,90 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
*/ */
private static final RpcController NULL_RPC_CONTROLLER = null; private static final RpcController NULL_RPC_CONTROLLER = null;
private final OMFailoverProxyProvider omFailoverProxyProvider;
private final OzoneManagerProtocolPB rpcProxy; private final OzoneManagerProtocolPB rpcProxy;
private final String clientID; private final String clientID;
private static final Logger FAILOVER_PROXY_PROVIDER_LOG =
LoggerFactory.getLogger(OMFailoverProxyProvider.class);
public OzoneManagerProtocolClientSideTranslatorPB(
OzoneManagerProtocolPB proxy, String clientId) {
this.rpcProxy = proxy;
this.clientID = clientId;
this.omFailoverProxyProvider = null;
}
/** /**
* Constructor for KeySpaceManger Client. * Constructor for OM Protocol Client. This creates a {@link RetryProxy}
* @param rpcProxy * over {@link OMFailoverProxyProvider} proxy. OMFailoverProxyProvider has
* one {@link OzoneManagerProtocolPB} proxy pointing to each OM node in the
* cluster.
*/ */
public OzoneManagerProtocolClientSideTranslatorPB( public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf,
OzoneManagerProtocolPB rpcProxy, String clientId) { String clientId, UserGroupInformation ugi) throws IOException {
this.rpcProxy = rpcProxy; this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi);
int maxRetries = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
int sleepBase = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
int sleepMax = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT);
this.rpcProxy = TracingUtil.createProxy(
createRetryProxy(omFailoverProxyProvider, maxRetries, maxFailovers,
sleepBase, sleepMax),
OzoneManagerProtocolPB.class);
this.clientID = clientId; this.clientID = clientId;
} }
/**
* Creates a {@link RetryProxy} encapsulating the
* {@link OMFailoverProxyProvider}. The retry proxy fails over on network
* exception or if the current proxy is not the leader OM.
*/
private OzoneManagerProtocolPB createRetryProxy(
OMFailoverProxyProvider failoverProxyProvider,
int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {
RetryPolicy retryPolicyOnNetworkException = RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
maxFailovers, maxRetries, delayMillis, maxDelayBase);
RetryPolicy retryPolicy = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception exception, int retries,
int failovers, boolean isIdempotentOrAtMostOnce)
throws Exception {
if (exception instanceof EOFException ||
exception instanceof ServiceException) {
if (retries < maxRetries && failovers < maxFailovers) {
return RetryAction.FAILOVER_AND_RETRY;
} else {
FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
"Attempted {} retries and {} failovers", retries, failovers);
return RetryAction.FAIL;
}
} else {
return retryPolicyOnNetworkException.shouldRetry(
exception, retries, failovers, isIdempotentOrAtMostOnce);
}
}
};
OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
return proxy;
}
@VisibleForTesting
public OMFailoverProxyProvider getOMFailoverProxyProvider() {
return omFailoverProxyProvider;
}
/** /**
* Closes this stream and releases any system resources associated * Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this * with it. If the stream is already closed then invoking this
@ -196,7 +279,19 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
OMRequest payload = OMRequest.newBuilder(omRequest) OMRequest payload = OMRequest.newBuilder(omRequest)
.setTraceID(TracingUtil.exportCurrentSpan()) .setTraceID(TracingUtil.exportCurrentSpan())
.build(); .build();
return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
OMResponse omResponse =
rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
String leaderOmId = omResponse.getLeaderOMNodeId();
// Failover to the OM node returned by OMReponse leaderOMNodeId if
// current proxy is not pointing to that node.
omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
}
return omResponse;
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }

View File

@ -140,6 +140,8 @@ message OMResponse {
required Status status = 5; required Status status = 5;
optional string leaderOMNodeId = 6;
optional CreateVolumeResponse createVolumeResponse = 11; optional CreateVolumeResponse createVolumeResponse = 11;
optional SetVolumePropertyResponse setVolumePropertyResponse = 12; optional SetVolumePropertyResponse setVolumePropertyResponse = 12;
optional CheckVolumeAccessResponse checkVolumeAccessResponse = 13; optional CheckVolumeAccessResponse checkVolumeAccessResponse = 13;

View File

@ -32,7 +32,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.BindException; import java.net.BindException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -48,6 +50,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class); LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
private Map<String, OzoneManager> ozoneManagerMap;
private List<OzoneManager> ozoneManagers; private List<OzoneManager> ozoneManagers;
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
@ -63,11 +66,12 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private MiniOzoneHAClusterImpl( private MiniOzoneHAClusterImpl(
OzoneConfiguration conf, OzoneConfiguration conf,
List<OzoneManager> omList, Map<String, OzoneManager> omMap,
StorageContainerManager scm, StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes) { List<HddsDatanodeService> hddsDatanodes) {
super(conf, scm, hddsDatanodes); super(conf, scm, hddsDatanodes);
this.ozoneManagers = omList; this.ozoneManagerMap = omMap;
this.ozoneManagers = new ArrayList<>(omMap.values());
} }
/** /**
@ -107,6 +111,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
ozoneManagers.get(index).stop(); ozoneManagers.get(index).stop();
} }
public void stopOzoneManager(String omNodeId) {
ozoneManagerMap.get(omNodeId).stop();
}
/** /**
* Builder for configuring the MiniOzoneCluster to run. * Builder for configuring the MiniOzoneCluster to run.
*/ */
@ -128,17 +136,17 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
DefaultMetricsSystem.setMiniClusterMode(true); DefaultMetricsSystem.setMiniClusterMode(true);
initializeConfiguration(); initializeConfiguration();
StorageContainerManager scm; StorageContainerManager scm;
List<OzoneManager> omList; Map<String, OzoneManager> omMap;
try { try {
scm = createSCM(); scm = createSCM();
scm.start(); scm.start();
omList = createOMService(); omMap = createOMService();
} catch (AuthenticationException ex) { } catch (AuthenticationException ex) {
throw new IOException("Unable to build MiniOzoneCluster. ", ex); throw new IOException("Unable to build MiniOzoneCluster. ", ex);
} }
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm); final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList, MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
scm, hddsDatanodes); scm, hddsDatanodes);
if (startDataNodes) { if (startDataNodes) {
cluster.startHddsDatanodes(); cluster.startHddsDatanodes();
@ -171,10 +179,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
* @throws IOException * @throws IOException
* @throws AuthenticationException * @throws AuthenticationException
*/ */
private List<OzoneManager> createOMService() throws IOException, private Map<String, OzoneManager> createOMService() throws IOException,
AuthenticationException { AuthenticationException {
List<OzoneManager> omList = new ArrayList<>(numOfOMs); Map<String, OzoneManager> omMap = new HashMap<>();
int retryCount = 0; int retryCount = 0;
int basePort = 10000; int basePort = 10000;
@ -186,10 +194,11 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
for (int i = 1; i<= numOfOMs; i++) { for (int i = 1; i<= numOfOMs; i++) {
// Set nodeId // Set nodeId
conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i); String nodeId = nodeIdBaseStr + i;
conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
// Set metadata/DB dir base path // Set metadata/DB dir base path
String metaDirPath = path + "/" + nodeIdBaseStr + i; String metaDirPath = path + "/" + nodeId;
conf.set(OZONE_METADATA_DIRS, metaDirPath); conf.set(OZONE_METADATA_DIRS, metaDirPath);
OMStorage omStore = new OMStorage(conf); OMStorage omStore = new OMStorage(conf);
initializeOmStorage(omStore); initializeOmStorage(omStore);
@ -201,7 +210,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
OzoneManager om = OzoneManager.createOm(null, conf); OzoneManager om = OzoneManager.createOm(null, conf);
om.setCertClient(certClient); om.setCertClient(certClient);
omList.add(om); omMap.put(nodeId, om);
om.start(); om.start();
LOG.info("Started OzoneManager RPC server at " + LOG.info("Started OzoneManager RPC server at " +
@ -211,23 +220,24 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
// Set default OM address to point to the first OM. Clients would // Set default OM address to point to the first OM. Clients would
// try connecting to this address by default // try connecting to this address by default
conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr())); NetUtils.getHostPortString(omMap.get(nodeIdBaseStr + 1)
.getOmRpcServerAddr()));
break; break;
} catch (BindException e) { } catch (BindException e) {
for (OzoneManager om : omList) { for (OzoneManager om : omMap.values()) {
om.stop(); om.stop();
om.join(); om.join();
LOG.info("Stopping OzoneManager server at " + LOG.info("Stopping OzoneManager server at " +
om.getOmRpcServerAddr()); om.getOmRpcServerAddr());
} }
omList.clear(); omMap.clear();
++retryCount; ++retryCount;
LOG.info("MiniOzoneHACluster port conflicts, retried " + LOG.info("MiniOzoneHACluster port conflicts, retried " +
retryCount + " times"); retryCount + " times");
} }
} }
return omList; return omMap;
} }
/** /**

View File

@ -65,8 +65,6 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rest.OzoneException; import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Container;
@ -76,6 +74,7 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocat
import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@ -189,9 +188,10 @@ public abstract class TestOzoneRpcClientAbstract {
*/ */
@Test @Test
public void testOMClientProxyProvider() { public void testOMClientProxyProvider() {
OMProxyProvider omProxyProvider = store.getClientProxy() OMFailoverProxyProvider omFailoverProxyProvider = store.getClientProxy()
.getOMProxyProvider(); .getOMProxyProvider();
List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies(); List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
omFailoverProxyProvider.getOMProxies();
// For a non-HA OM service, there should be only one OM proxy. // For a non-HA OM service, there should be only one OM proxy.
Assert.assertEquals(1, omProxies.size()); Assert.assertEquals(1, omProxies.size());

View File

@ -18,30 +18,29 @@ package org.apache.hadoop.ozone.om;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.log4j.Logger;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -49,6 +48,14 @@ import java.util.UUID;
import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
.NODE_FAILURE_TIMEOUT; .NODE_FAILURE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
@ -58,8 +65,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
public class TestOzoneManagerHA { public class TestOzoneManagerHA {
private MiniOzoneHAClusterImpl cluster = null; private MiniOzoneHAClusterImpl cluster = null;
private StorageHandler storageHandler; private ObjectStore objectStore;
private UserArgs userArgs;
private OzoneConfiguration conf; private OzoneConfiguration conf;
private String clusterId; private String clusterId;
private String scmId; private String scmId;
@ -69,7 +75,7 @@ public class TestOzoneManagerHA {
public ExpectedException exception = ExpectedException.none(); public ExpectedException exception = ExpectedException.none();
@Rule @Rule
public Timeout timeout = new Timeout(60_000); public Timeout timeout = new Timeout(120_000);
/** /**
* Create a MiniDFSCluster for testing. * Create a MiniDFSCluster for testing.
@ -85,6 +91,9 @@ public class TestOzoneManagerHA {
scmId = UUID.randomUUID().toString(); scmId = UUID.randomUUID().toString();
conf.setBoolean(OZONE_ACL_ENABLED, true); conf.setBoolean(OZONE_ACL_ENABLED, true);
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId) .setClusterId(clusterId)
@ -93,9 +102,7 @@ public class TestOzoneManagerHA {
.setNumOfOzoneManagers(numOfOMs) .setNumOfOzoneManagers(numOfOMs)
.build(); .build();
cluster.waitForClusterToBeReady(); cluster.waitForClusterToBeReady();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler(); objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
null, null, null, null);
} }
/** /**
@ -115,7 +122,7 @@ public class TestOzoneManagerHA {
*/ */
@Test @Test
public void testAllOMNodesRunning() throws Exception { public void testAllOMNodesRunning() throws Exception {
testCreateVolume(true); createVolumeTest(true);
} }
/** /**
@ -126,52 +133,56 @@ public class TestOzoneManagerHA {
cluster.stopOzoneManager(1); cluster.stopOzoneManager(1);
Thread.sleep(NODE_FAILURE_TIMEOUT * 2); Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
testCreateVolume(true); createVolumeTest(true);
} }
/** /**
* Test client request fails when 2 OMs are down. * Test client request fails when 2 OMs are down.
*/ */
@Test @Test
@Ignore("TODO:HDDS-1158")
public void testTwoOMNodesDown() throws Exception { public void testTwoOMNodesDown() throws Exception {
cluster.stopOzoneManager(1); cluster.stopOzoneManager(1);
cluster.stopOzoneManager(2); cluster.stopOzoneManager(2);
Thread.sleep(NODE_FAILURE_TIMEOUT * 2); Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
testCreateVolume(false); createVolumeTest(false);
} }
/** /**
* Create a volume and test its attribute. * Create a volume and test its attribute.
*/ */
private void testCreateVolume(boolean checkSuccess) throws Exception { private void createVolumeTest(boolean checkSuccess) throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5); String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5); String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5); String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs); VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
createVolumeArgs.setUserName(userName); .setOwner(userName)
createVolumeArgs.setAdminName(adminName); .setAdmin(adminName)
.build();
try { try {
storageHandler.createVolume(createVolumeArgs); objectStore.createVolume(volumeName, createVolumeArgs);
VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs); OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
if (checkSuccess) { if (checkSuccess) {
Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName)); Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName)); Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
} else { } else {
// Verify that the request failed // Verify that the request failed
Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty());
Assert.fail("There is no quorum. Request should have failed"); Assert.fail("There is no quorum. Request should have failed");
} }
} catch (OMException e) { } catch (ConnectException | RemoteException e) {
if (!checkSuccess) { if (!checkSuccess) {
// If the last OM to be tried by the RetryProxy is down, we would get
// ConnectException. Otherwise, we would get a RemoteException from the
// last running OM as it would fail to get a quorum.
if (e instanceof RemoteException) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
"RaftRetryFailureException", e); "RaftRetryFailureException", e);
}
} else { } else {
throw e; throw e;
} }
@ -179,14 +190,16 @@ public class TestOzoneManagerHA {
} }
/** /**
* Test that OMProxyProvider creates an OM proxy for each OM in the cluster. * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
* cluster.
*/ */
@Test @Test
public void testOMClientProxyProvide() throws Exception { public void testOMProxyProviderInitialization() throws Exception {
OzoneClient rpcClient = cluster.getRpcClient(); OzoneClient rpcClient = cluster.getRpcClient();
OMProxyProvider omProxyProvider = OMFailoverProxyProvider omFailoverProxyProvider =
rpcClient.getObjectStore().getClientProxy().getOMProxyProvider(); rpcClient.getObjectStore().getClientProxy().getOMProxyProvider();
List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies(); List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
omFailoverProxyProvider.getOMProxies();
Assert.assertEquals(numOfOMs, omProxies.size()); Assert.assertEquals(numOfOMs, omProxies.size());
@ -194,7 +207,7 @@ public class TestOzoneManagerHA {
InetSocketAddress omRpcServerAddr = InetSocketAddress omRpcServerAddr =
cluster.getOzoneManager(i).getOmRpcServerAddr(); cluster.getOzoneManager(i).getOmRpcServerAddr();
boolean omClientProxyExists = false; boolean omClientProxyExists = false;
for (OMProxyInfo omProxyInfo : omProxies) { for (OMFailoverProxyProvider.OMProxyInfo omProxyInfo : omProxies) {
if (omProxyInfo.getAddress().equals(omRpcServerAddr)) { if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
omClientProxyExists = true; omClientProxyExists = true;
break; break;
@ -205,4 +218,99 @@ public class TestOzoneManagerHA {
omClientProxyExists); omClientProxyExists);
} }
} }
/**
* Test OMFailoverProxyProvider failover on connection exception to OM client.
*/
@Test
public void testOMProxyProviderFailoverOnConnectionFailure()
throws Exception {
OMFailoverProxyProvider omFailoverProxyProvider =
objectStore.getClientProxy().getOMProxyProvider();
String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
createVolumeTest(true);
// On stopping the current OM Proxy, the next connection attempt should
// failover to a another OM proxy.
cluster.stopOzoneManager(firstProxyNodeId);
Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT * 4);
// Next request to the proxy provider should result in a failover
createVolumeTest(true);
Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
// Get the new OM Proxy NodeId
String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
// Verify that a failover occured. the new proxy nodeId should be
// different from the old proxy nodeId.
Assert.assertNotEquals("Failover did not occur as expected",
firstProxyNodeId, newProxyNodeId);
}
/**
* Test OMFailoverProxyProvider failover when current OM proxy is not
* the current OM Leader.
*/
@Test
public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
OMFailoverProxyProvider omFailoverProxyProvider =
objectStore.getClientProxy().getOMProxyProvider();
// Run couple of createVolume tests to discover the current Leader OM
createVolumeTest(true);
createVolumeTest(true);
// The OMFailoverProxyProvider will point to the current leader OM node.
String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
// Perform a manual failover of the proxy provider to move the
// currentProxyIndex to a node other than the leader OM.
omFailoverProxyProvider.performFailover(
omFailoverProxyProvider.getProxy().proxy);
String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);
// Once another request is sent to this new proxy node, the leader
// information must be returned via the response and a failover must
// happen to the leader proxy node.
createVolumeTest(true);
Thread.sleep(2000);
String newLeaderOMNodeId =
omFailoverProxyProvider.getCurrentProxyOMNodeId();
// The old and new Leader OM NodeId must match since there was no new
// election in the Ratis ring.
Assert.assertEquals(leaderOMNodeId, newLeaderOMNodeId);
}
@Test
public void testOMRetryProxy() throws Exception {
// Stop all the OMs. After making 5 (set maxRetries value) attempts at
// connection, the RpcClient should give up.
for (int i = 0; i < numOfOMs; i++) {
cluster.stopOzoneManager(i);
}
final LogVerificationAppender appender = new LogVerificationAppender();
final org.apache.log4j.Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
try {
createVolumeTest(true);
Assert.fail("TestOMRetryProxy should fail when there are no OMs running");
} catch (ConnectException e) {
// Each retry attempt tries upto 10 times to connect. So there should be
// 3*10 "Retrying connect to server" messages
Assert.assertEquals(30,
appender.countLinesWithMessage("Retrying connect to server:"));
Assert.assertEquals(1,
appender.countLinesWithMessage("Failed to connect to OM. Attempted " +
"3 retries and 3 failovers"));
}
}
} }

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.OzoneIllegalArgumentException; import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.security.OzoneSecurityException; import org.apache.hadoop.ozone.security.OzoneSecurityException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@ -2610,4 +2611,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
public String getComponent() { public String getComponent() {
return omComponent; return omComponent;
} }
@Override
public OMFailoverProxyProvider getOMFailoverProxyProvider() {
return null;
}
} }

View File

@ -32,6 +32,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.retry.RetryPolicy;
@ -100,10 +101,12 @@ public final class OMRatisHelper {
return Message.valueOf(ByteString.copyFrom(requestBytes)); return Message.valueOf(ByteString.copyFrom(requestBytes));
} }
static OMResponse convertByteStringToOMResponse(ByteString byteString) static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
throws InvalidProtocolBufferException { throws InvalidProtocolBufferException {
byte[] bytes = byteString.toByteArray(); byte[] bytes = reply.getMessage().getContent().toByteArray();
return OMResponse.parseFrom(bytes); return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
.setLeaderOMNodeId(reply.getReplierId())
.build();
} }
static OMResponse getErrorResponse(Type cmdType, Exception e) { static OMResponse getErrorResponse(Type cmdType, Exception e) {

View File

@ -23,7 +23,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMConfigKeys;
@ -53,24 +56,24 @@ public final class OzoneManagerRatisClient implements Closeable {
OzoneManagerRatisClient.class); OzoneManagerRatisClient.class);
private final RaftGroup raftGroup; private final RaftGroup raftGroup;
private final String omID; private final String omNodeID;
private final RpcType rpcType; private final RpcType rpcType;
private RaftClient raftClient; private RaftClient raftClient;
private final RetryPolicy retryPolicy; private final RetryPolicy retryPolicy;
private final Configuration conf; private final Configuration conf;
private OzoneManagerRatisClient(String omId, RaftGroup raftGroup, private OzoneManagerRatisClient(String omNodeId, RaftGroup raftGroup,
RpcType rpcType, RetryPolicy retryPolicy, RpcType rpcType, RetryPolicy retryPolicy,
Configuration config) { Configuration config) {
this.raftGroup = raftGroup; this.raftGroup = raftGroup;
this.omID = omId; this.omNodeID = omNodeId;
this.rpcType = rpcType; this.rpcType = rpcType;
this.retryPolicy = retryPolicy; this.retryPolicy = retryPolicy;
this.conf = config; this.conf = config;
} }
public static OzoneManagerRatisClient newOzoneManagerRatisClient( public static OzoneManagerRatisClient newOzoneManagerRatisClient(
String omId, RaftGroup raftGroup, Configuration conf) { String omNodeId, RaftGroup raftGroup, Configuration conf) {
final String rpcType = conf.get( final String rpcType = conf.get(
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY, OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT); OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
@ -87,19 +90,19 @@ public final class OzoneManagerRatisClient implements Closeable {
final RetryPolicy retryPolicy = RetryPolicies final RetryPolicy retryPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration); .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
return new OzoneManagerRatisClient(omId, raftGroup, return new OzoneManagerRatisClient(omNodeId, raftGroup,
SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf); SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf);
} }
public void connect() { public void connect() {
LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}", LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}",
raftGroup.getGroupId().getUuid().toString(), omID); raftGroup.getGroupId().getUuid().toString(), omNodeID);
// TODO : XceiverClient ratis should pass the config value of // TODO : XceiverClient ratis should pass the config value of
// maxOutstandingRequests so as to set the upper bound on max no of async // maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client // requests to be handled by raft client
raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup, raftClient = OMRatisHelper.newRaftClient(rpcType, omNodeID, raftGroup,
retryPolicy, conf); retryPolicy, conf);
} }
@ -119,13 +122,12 @@ public final class OzoneManagerRatisClient implements Closeable {
* @param request Request * @param request Request
* @return Response to the command * @return Response to the command
*/ */
public OMResponse sendCommand(OMRequest request) { public OMResponse sendCommand(OMRequest request) throws ServiceException {
try { try {
CompletableFuture<OMResponse> reply = sendCommandAsync(request); CompletableFuture<OMResponse> reply = sendCommandAsync(request);
return reply.get(); return reply.get();
} catch (ExecutionException | InterruptedException e) { } catch (ExecutionException | InterruptedException e) {
LOG.error("Failed to execute command: " + request, e); throw new ServiceException(e);
return OMRatisHelper.getErrorResponse(request.getCmdType(), e);
} }
} }
@ -152,9 +154,10 @@ public final class OzoneManagerRatisClient implements Closeable {
if (raftRetryFailureException != null) { if (raftRetryFailureException != null) {
throw new CompletionException(raftRetryFailureException); throw new CompletionException(raftRetryFailureException);
} }
OMResponse response = OMRatisHelper OMResponse response = OMRatisHelper
.convertByteStringToOMResponse(reply.getMessage() .getOMResponseFromRaftClientReply(reply);
.getContent());
return response; return response;
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
throw new CompletionException(e); throw new CompletionException(e);

View File

@ -80,7 +80,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
/** /**
* Submits request to OM's Ratis server. * Submits request to OM's Ratis server.
*/ */
private OMResponse submitRequestToRatis(OMRequest request) { private OMResponse submitRequestToRatis(OMRequest request)
throws ServiceException {
return omRatisClient.sendCommand(request); return omRatisClient.sendCommand(request);
} }

View File

@ -34,8 +34,6 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMNodeDetails; import org.apache.hadoop.ozone.om.OMNodeDetails;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest; .OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupId;
@ -109,27 +107,6 @@ public class TestOzoneManagerRatisServer {
LifeCycle.State.RUNNING, omRatisServer.getServerState()); LifeCycle.State.RUNNING, omRatisServer.getServerState());
} }
/**
* Submit any request to OM Ratis server and check that the dummy response
* message is received.
*/
@Test
public void testSubmitRatisRequest() throws Exception {
// Wait for leader election
Thread.sleep(LEADER_ELECTION_TIMEOUT * 2);
OMRequest request = OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
.setClientId(clientId)
.build();
OMResponse response = omRatisClient.sendCommand(request);
Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateVolume,
response.getCmdType());
Assert.assertEquals(false, response.getSuccess());
Assert.assertEquals(false, response.hasCreateVolumeResponse());
}
/** /**
* Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
* categorized in {@link OmUtils#isReadOnly(OMRequest)}. * categorized in {@link OmUtils#isReadOnly(OMRequest)}.