HDDS-1109. Setup Failover Proxy Provider for OM client.

This commit is contained in:
Hanisha Koneru 2019-02-20 14:49:59 -08:00
parent 1bea785020
commit b1397ff9e4
9 changed files with 336 additions and 15 deletions

View File

@ -72,6 +72,11 @@ public class ObjectStore {
proxy = null;
}
@VisibleForTesting
public ClientProtocol getClientProxy() {
return proxy;
}
/**
* Creates the volume with default values.
* @param volumeName Name of the volume to be created.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client.protocol;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.io.Text;
@ -28,6 +29,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@ -506,4 +508,7 @@ public interface ClientProtocol {
* @throws IOException
*/
S3SecretValue getS3Secret(String kerberosID) throws IOException;
@VisibleForTesting
OMProxyProvider getOMProxyProvider();
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.ozone.client.rest.headers.Header;
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@ -723,6 +724,11 @@ public class RestClient implements ClientProtocol {
"support this operation.");
}
@Override
public OMProxyProvider getOMProxyProvider() {
return null;
}
@Override
public OzoneInputStream getKey(
String volumeName, String bucketName, String keyName)

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client.rpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
@ -34,7 +35,6 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.*;
@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@ -82,7 +83,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.io.Text;
import org.apache.logging.log4j.util.Strings;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -105,6 +105,7 @@ public class RpcClient implements ClientProtocol {
private final OzoneConfiguration conf;
private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private final OMProxyProvider omProxyProvider;
private final OzoneManagerProtocolClientSideTranslatorPB
ozoneManagerClient;
private final XceiverClientManager xceiverClientManager;
@ -118,7 +119,6 @@ public class RpcClient implements ClientProtocol {
private final long streamBufferMaxSize;
private final long blockSize;
private final long watchTimeout;
private ClientId clientId = ClientId.randomId();
/**
* Creates RpcClient instance with the given configuration.
@ -133,17 +133,10 @@ public class RpcClient implements ClientProtocol {
OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
this.groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS,
OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
long omVersion =
RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
InetSocketAddress omAddress = OmUtils
.getOmAddressForClients(conf);
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
this.ozoneManagerClient =
new OzoneManagerProtocolClientSideTranslatorPB(
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
omAddress, ugi, conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)), clientId.toString());
this.omProxyProvider = new OMProxyProvider(conf, ugi);
this.ozoneManagerClient = this.omProxyProvider.getProxy();
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
@ -487,6 +480,12 @@ public class RpcClient implements ClientProtocol {
return ozoneManagerClient.getS3Secret(kerberosID);
}
@Override
@VisibleForTesting
public OMProxyProvider getOMProxyProvider() {
return omProxyProvider;
}
@Override
public void setBucketVersioning(
String volumeName, String bucketName, Boolean versioning)

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.ha;
import org.apache.hadoop.ozone.om.protocolPB
.OzoneManagerProtocolClientSideTranslatorPB;
import java.net.InetSocketAddress;
/**
* Proxy information of OM.
*/
public final class OMProxyInfo {
private InetSocketAddress address;
private OzoneManagerProtocolClientSideTranslatorPB omClient;
public OMProxyInfo(InetSocketAddress addr) {
this.address = addr;
}
public InetSocketAddress getAddress() {
return address;
}
public OzoneManagerProtocolClientSideTranslatorPB getOMProxy() {
return omClient;
}
public void setOMProxy(
OzoneManagerProtocolClientSideTranslatorPB clientProxy) {
this.omClient = clientProxy;
}
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.ha;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.protocolPB
.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
/**
* A failover proxy provider implementation which allows clients to configure
* multiple OMs to connect to. In case of OM failover, client can try
* connecting to another OM node from the list of proxies.
*/
public class OMProxyProvider implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(OMProxyProvider.class);
private List<OMProxyInfo> omProxies;
private int currentProxyIndex = 0;
private final Configuration conf;
private final long omVersion;
private final UserGroupInformation ugi;
private ClientId clientId = ClientId.randomId();
public OMProxyProvider(Configuration configuration,
UserGroupInformation ugi) {
this.conf = configuration;
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
this.ugi = ugi;
loadOMClientConfigs(conf);
}
private void loadOMClientConfigs(Configuration config) {
this.omProxies = new ArrayList<>();
Collection<String> omServiceIds = config.getTrimmedStringCollection(
OZONE_OM_SERVICE_IDS_KEY);
if (omServiceIds.size() > 1) {
throw new IllegalArgumentException("Multi-OM Services is not supported." +
" Please configure only one OM Service ID in " +
OZONE_OM_SERVICE_IDS_KEY);
}
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
serviceId, nodeId);
String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
if (rpcAddrStr == null) {
continue;
}
InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
// Add the OM client proxy info to list of proxies
if (addr != null) {
OMProxyInfo omProxyInfo = new OMProxyInfo(addr);
omProxies.add(omProxyInfo);
} else {
LOG.error("Failed to create OM proxy at address {}", rpcAddrStr);
}
}
}
if (omProxies.isEmpty()) {
throw new IllegalArgumentException("Could not find any configured " +
"addresses for OM. Please configure the system with "
+ OZONE_OM_ADDRESS_KEY);
}
}
private OzoneManagerProtocolClientSideTranslatorPB getOMClient(
InetSocketAddress omAddress) throws IOException {
return new OzoneManagerProtocolClientSideTranslatorPB(
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)), clientId.toString());
}
/**
* Get the proxy object which should be used until the next failover event
* occurs. RPC proxy object is intialized lazily.
* @return the OM proxy object to invoke methods upon
*/
public synchronized OzoneManagerProtocolClientSideTranslatorPB getProxy() {
OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyIndex);
return createOMClientIfNeeded(currentOMProxyInfo);
}
private OzoneManagerProtocolClientSideTranslatorPB createOMClientIfNeeded(
OMProxyInfo proxyInfo) {
if (proxyInfo.getOMProxy() == null) {
try {
proxyInfo.setOMProxy(getOMClient(proxyInfo.getAddress()));
} catch (IOException ioe) {
LOG.error("{} Failed to create RPC proxy to OM at {}",
this.getClass().getSimpleName(), proxyInfo.getAddress(), ioe);
throw new RuntimeException(ioe);
}
}
return proxyInfo.getOMProxy();
}
/**
* Called whenever an error warrants failing over. It is determined by the
* retry policy.
*/
public void performFailover() {
incrementProxyIndex();
}
synchronized void incrementProxyIndex() {
currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* the proxy provider.
*/
@Override
public synchronized void close() throws IOException {
for (OMProxyInfo proxy : omProxies) {
OzoneManagerProtocolClientSideTranslatorPB omProxy = proxy.getOMProxy();
if (omProxy != null) {
RPC.stopProxy(omProxy);
}
}
}
@VisibleForTesting
public List<OMProxyInfo> getOMProxies() {
return omProxies;
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.ha;
/**
* This package contains Ozone Client's OM Proxy classes.
*/

View File

@ -63,6 +63,8 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@ -180,6 +182,23 @@ public abstract class TestOzoneRpcClientAbstract {
TestOzoneRpcClientAbstract.scmId = scmId;
}
/**
* Test OM Proxy Provider.
*/
@Test
public void testOMClientProxyProvider() {
OMProxyProvider omProxyProvider = store.getClientProxy()
.getOMProxyProvider();
List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
// For a non-HA OM service, there should be only one OM proxy.
Assert.assertEquals(1, omProxies.size());
// The address in OMProxyInfo object, which client will connect to,
// should match the OM's RPC address.
Assert.assertTrue(omProxies.get(0).getAddress().equals(
ozoneManager.getOmRpcServerAddr()));
}
@Test
public void testSetVolumeQuota()
throws IOException, OzoneException {

View File

@ -19,19 +19,29 @@ package org.apache.hadoop.ozone.om;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.*;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.junit.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.*;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
.NODE_FAILURE_TIMEOUT;
@ -153,4 +163,32 @@ public class TestOzoneManagerHA {
Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty());
}
}
/**
* Test that OMProxyProvider creates an OM proxy for each OM in the cluster.
*/
@Test
public void testOMClientProxyProvide() throws Exception {
OzoneClient rpcClient = cluster.getRpcClient();
OMProxyProvider omProxyProvider =
rpcClient.getObjectStore().getClientProxy().getOMProxyProvider();
List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
Assert.assertEquals(numOfOMs, omProxies.size());
for (int i = 0; i < numOfOMs; i++) {
InetSocketAddress omRpcServerAddr =
cluster.getOzoneManager(i).getOmRpcServerAddr();
boolean omClientProxyExists = false;
for (OMProxyInfo omProxyInfo : omProxies) {
if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
omClientProxyExists = true;
break;
}
}
Assert.assertTrue("There is no OM Client Proxy corresponding to OM " +
"node" + cluster.getOzoneManager(i).getOMNodId(),
omClientProxyExists);
}
}
}