HDDS-2019. Handle Set DtService of token in S3Gateway for OM HA. (#1489)
This commit is contained in:
parent
53ed78bcdb
commit
b09d389001
|
@ -62,6 +62,9 @@ public class OzoneClientProducer {
|
|||
@Inject
|
||||
private Text omService;
|
||||
|
||||
@Inject
|
||||
private String omServiceID;
|
||||
|
||||
|
||||
@Produces
|
||||
public OzoneClient createClient() throws IOException {
|
||||
|
@ -105,7 +108,13 @@ public class OzoneClientProducer {
|
|||
} catch (Exception e) {
|
||||
LOG.error("Error: ", e);
|
||||
}
|
||||
return OzoneClientFactory.getClient(ozoneConfiguration);
|
||||
|
||||
if (omServiceID == null) {
|
||||
return OzoneClientFactory.getClient(ozoneConfiguration);
|
||||
} else {
|
||||
// As in HA case, we need to pass om service ID.
|
||||
return OzoneClientFactory.getRpcClient(omServiceID, ozoneConfiguration);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -20,33 +20,75 @@ package org.apache.hadoop.ozone.s3;
|
|||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.s3.util.OzoneS3Util;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.enterprise.inject.Produces;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
|
||||
|
||||
/**
|
||||
* This class creates the OM service .
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class OzoneServiceProvider {
|
||||
|
||||
private Text omServiceAdd;
|
||||
private Text omServiceAddr;
|
||||
|
||||
private String omserviceID;
|
||||
|
||||
@Inject
|
||||
private OzoneConfiguration conf;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
omServiceAdd = SecurityUtil.buildTokenService(OmUtils.
|
||||
getOmAddressForClients(conf));
|
||||
Collection<String> serviceIdList =
|
||||
conf.getTrimmedStringCollection(OZONE_OM_SERVICE_IDS_KEY);
|
||||
if (serviceIdList.size() == 0) {
|
||||
// Non-HA cluster
|
||||
omServiceAddr = SecurityUtil.buildTokenService(OmUtils.
|
||||
getOmAddressForClients(conf));
|
||||
} else {
|
||||
// HA cluster.
|
||||
//For now if multiple service id's are configured we throw exception.
|
||||
// As if multiple service id's are configured, S3Gateway will not be
|
||||
// knowing which one to talk to. In future, if OM federation is supported
|
||||
// we can resolve this by having another property like
|
||||
// ozone.om.internal.service.id.
|
||||
// TODO: Revisit this later.
|
||||
if (serviceIdList.size() > 1) {
|
||||
throw new IllegalArgumentException("Multiple serviceIds are " +
|
||||
"configured. " + Arrays.toString(serviceIdList.toArray()));
|
||||
} else {
|
||||
String serviceId = serviceIdList.iterator().next();
|
||||
Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf, serviceId);
|
||||
if (omNodeIds.size() == 0) {
|
||||
throw new IllegalArgumentException(OZONE_OM_NODES_KEY
|
||||
+ "." + serviceId + " is not defined");
|
||||
}
|
||||
omServiceAddr = new Text(OzoneS3Util.buildServiceNameForToken(conf,
|
||||
serviceId, omNodeIds));
|
||||
omserviceID = serviceId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Produces
|
||||
public Text getService() {
|
||||
return omServiceAdd;
|
||||
return omServiceAddr;
|
||||
}
|
||||
|
||||
@Produces
|
||||
public String getOmServiceID() {
|
||||
return omserviceID;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,8 +19,17 @@
|
|||
package org.apache.hadoop.ozone.s3.util;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
|
||||
|
||||
/**
|
||||
* Ozone util for S3 related operations.
|
||||
*/
|
||||
|
@ -33,4 +42,39 @@ public final class OzoneS3Util {
|
|||
Objects.requireNonNull(userName);
|
||||
return DigestUtils.md5Hex(userName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate service Name for token.
|
||||
* @param configuration
|
||||
* @param serviceId - ozone manager service ID
|
||||
* @param omNodeIds - list of node ids for the given OM service.
|
||||
* @return service Name.
|
||||
*/
|
||||
public static String buildServiceNameForToken(
|
||||
@Nonnull OzoneConfiguration configuration, @Nonnull String serviceId,
|
||||
@Nonnull Collection<String> omNodeIds) {
|
||||
StringBuilder rpcAddress = new StringBuilder();
|
||||
|
||||
int nodesLength = omNodeIds.size();
|
||||
int counter = 0;
|
||||
for (String nodeId : omNodeIds) {
|
||||
counter++;
|
||||
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
|
||||
serviceId, nodeId);
|
||||
String rpcAddrStr = OmUtils.getOmRpcAddress(configuration, rpcAddrKey);
|
||||
if (rpcAddrStr == null || rpcAddrStr.isEmpty()) {
|
||||
throw new IllegalArgumentException("Could not find rpcAddress for " +
|
||||
OZONE_OM_ADDRESS_KEY + "." + serviceId + "." + nodeId);
|
||||
}
|
||||
|
||||
if (counter != nodesLength) {
|
||||
rpcAddress.append(SecurityUtil.buildTokenService(
|
||||
NetUtils.createSocketAddr(rpcAddrStr)) + ",");
|
||||
} else {
|
||||
rpcAddress.append(SecurityUtil.buildTokenService(
|
||||
NetUtils.createSocketAddr(rpcAddrStr)));
|
||||
}
|
||||
}
|
||||
return rpcAddress.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.s3.util;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Class used to test OzoneS3Util.
|
||||
*/
|
||||
public class TestOzoneS3Util {
|
||||
|
||||
|
||||
private OzoneConfiguration configuration;
|
||||
private String serviceID = "omService";
|
||||
|
||||
@Before
|
||||
public void setConf() {
|
||||
configuration = new OzoneConfiguration();
|
||||
|
||||
String nodeIDs = "om1,om2,om3";
|
||||
configuration.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, serviceID);
|
||||
configuration.set(OMConfigKeys.OZONE_OM_NODES_KEY + "." + serviceID,
|
||||
nodeIDs);
|
||||
configuration.setBoolean(HADOOP_SECURITY_TOKEN_SERVICE_USE_IP, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildServiceNameForToken() {
|
||||
|
||||
Collection<String> nodeIDList = OmUtils.getOMNodeIds(configuration,
|
||||
serviceID);
|
||||
|
||||
configuration.set(OmUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
|
||||
serviceID, "om1"), "om1:9862");
|
||||
configuration.set(OmUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
|
||||
serviceID, "om2"), "om2:9862");
|
||||
configuration.set(OmUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
|
||||
serviceID, "om3"), "om3:9862");
|
||||
|
||||
String expectedOmServiceAddress = buildServiceAddress(nodeIDList);
|
||||
|
||||
SecurityUtil.setConfiguration(configuration);
|
||||
String omserviceAddr = OzoneS3Util.buildServiceNameForToken(configuration,
|
||||
serviceID, nodeIDList);
|
||||
|
||||
Assert.assertEquals(expectedOmServiceAddress, omserviceAddr);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testBuildServiceNameForTokenIncorrectConfig() {
|
||||
|
||||
Collection<String> nodeIDList = OmUtils.getOMNodeIds(configuration,
|
||||
serviceID);
|
||||
|
||||
// Don't set om3 node rpc address. Here we are skipping setting of one of
|
||||
// the OM address. So buildServiceNameForToken will fail.
|
||||
configuration.set(OmUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
|
||||
serviceID, "om1"), "om1:9862");
|
||||
configuration.set(OmUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
|
||||
serviceID, "om2"), "om2:9862");
|
||||
|
||||
|
||||
SecurityUtil.setConfiguration(configuration);
|
||||
|
||||
try {
|
||||
OzoneS3Util.buildServiceNameForToken(configuration,
|
||||
serviceID, nodeIDList);
|
||||
fail("testBuildServiceNameForTokenIncorrectConfig failed");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
GenericTestUtils.assertExceptionContains("Could not find rpcAddress " +
|
||||
"for", ex);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Build serviceName from list of node ids.
|
||||
* @param nodeIDList
|
||||
* @return service name for token.
|
||||
*/
|
||||
private String buildServiceAddress(Collection<String> nodeIDList) {
|
||||
StringBuilder omServiceAddrBuilder = new StringBuilder();
|
||||
int nodesLength = nodeIDList.size();
|
||||
int counter = 0;
|
||||
for (String nodeID : nodeIDList) {
|
||||
counter++;
|
||||
String addr = configuration.get(OmUtils.addKeySuffixes(
|
||||
OMConfigKeys.OZONE_OM_ADDRESS_KEY, serviceID, nodeID));
|
||||
|
||||
if (counter != nodesLength) {
|
||||
omServiceAddrBuilder.append(addr + ",");
|
||||
} else {
|
||||
omServiceAddrBuilder.append(addr);
|
||||
}
|
||||
}
|
||||
|
||||
return omServiceAddrBuilder.toString();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue