HDDS-1119. DN get OM certificate from SCM CA for block token validation. Contributed by Ajay Kumar. (#601)

This commit is contained in:
Ajay Yadav 2019-03-18 23:08:17 -07:00 committed by GitHub
parent 568d3ab8b6
commit f10d493325
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 1093 additions and 600 deletions

View File

@ -45,6 +45,7 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
private String ipAddress;
private String hostName;
private List<Port> ports;
private String certSerialId;
/**
@ -54,13 +55,15 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
* @param ipAddress IP Address of this DataNode
* @param hostName DataNode's hostname
* @param ports Ports used by the DataNode
* @param certSerialId serial id from SCM issued certificate.
*/
private DatanodeDetails(String uuid, String ipAddress, String hostName,
List<Port> ports) {
List<Port> ports, String certSerialId) {
this.uuid = UUID.fromString(uuid);
this.ipAddress = ipAddress;
this.hostName = hostName;
this.ports = ports;
this.certSerialId = certSerialId;
}
protected DatanodeDetails(DatanodeDetails datanodeDetails) {
@ -177,6 +180,9 @@ public static DatanodeDetails getFromProtoBuf(
if (datanodeDetailsProto.hasHostName()) {
builder.setHostName(datanodeDetailsProto.getHostName());
}
if (datanodeDetailsProto.hasCertSerialId()) {
builder.setCertSerialId(datanodeDetailsProto.getCertSerialId());
}
for (HddsProtos.Port port : datanodeDetailsProto.getPortsList()) {
builder.addPort(newPort(
Port.Name.valueOf(port.getName().toUpperCase()), port.getValue()));
@ -198,6 +204,9 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() {
if (hostName != null) {
builder.setHostName(hostName);
}
if (certSerialId != null) {
builder.setCertSerialId(certSerialId);
}
for (Port port : ports) {
builder.addPorts(HddsProtos.Port.newBuilder()
.setName(port.getName().toString())
@ -214,6 +223,7 @@ public String toString() {
ipAddress +
", host: " +
hostName +
", certSerialId: " + certSerialId +
"}";
}
@ -250,6 +260,7 @@ public static final class Builder {
private String ipAddress;
private String hostName;
private List<Port> ports;
private String certSerialId;
/**
* Default private constructor. To create Builder instance use
@ -304,6 +315,18 @@ public Builder addPort(Port port) {
return this;
}
/**
* Adds certificate serial id.
*
* @param certId Serial id of SCM issued certificate.
*
* @return DatanodeDetails.Builder
*/
public Builder setCertSerialId(String certId) {
this.certSerialId = certId;
return this;
}
/**
* Builds and returns DatanodeDetails instance.
*
@ -311,7 +334,7 @@ public Builder addPort(Port port) {
*/
public DatanodeDetails build() {
Preconditions.checkNotNull(id);
return new DatanodeDetails(id, ipAddress, hostName, ports);
return new DatanodeDetails(id, ipAddress, hostName, ports, certSerialId);
}
}
@ -398,4 +421,21 @@ public boolean equals(Object anObject) {
}
}
/**
* Returns serial id of SCM issued certificate.
*
* @return certificate serial id
*/
public String getCertSerialId() {
return certSerialId;
}
/**
* Set certificate serial id of SCM issued certificate.
*
*/
public void setCertSerialId(String certSerialId) {
this.certSerialId = certSerialId;
}
}

View File

@ -60,9 +60,9 @@ public UserGroupInformation verify(String user, String tokenStr)
if (conf.isBlockTokenEnabled()) {
// TODO: add audit logs.
if (Strings.isNullOrEmpty(tokenStr) || isTestStub()) {
if (Strings.isNullOrEmpty(tokenStr)) {
throw new BlockTokenException("Fail to find any token (empty or " +
"null.");
"null.)");
}
final Token<OzoneBlockTokenIdentifier> token = new Token();
OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
@ -78,29 +78,26 @@ public UserGroupInformation verify(String user, String tokenStr)
throw new BlockTokenException("Failed to decode token : " + tokenStr);
}
// TODO: revisit this when caClient is ready, skip signature check now.
/**
* the final code should like
* if (caClient == null) {
* throw new SCMSecurityException("Certificate client not available to
* validate token");
* }
*/
if (caClient != null) {
X509Certificate singerCert = caClient.queryCertificate(
"certId=" + tokenId.getOmCertSerialId());
if (singerCert == null) {
throw new BlockTokenException("Can't find signer certificate " +
"(OmCertSerialId: " + tokenId.getOmCertSerialId() +
") of the block token for user: " + tokenId.getUser());
}
Boolean validToken = caClient.verifySignature(tokenId.getBytes(),
token.getPassword(), singerCert);
if (!validToken) {
throw new BlockTokenException("Invalid block token for user: " +
tokenId.getUser());
}
if (caClient == null) {
throw new SCMSecurityException("Certificate client not available " +
"to validate token");
}
X509Certificate singerCert;
singerCert = caClient.getCertificate(tokenId.getOmCertSerialId());
if (singerCert == null) {
throw new BlockTokenException("Can't find signer certificate " +
"(OmCertSerialId: " + tokenId.getOmCertSerialId() +
") of the block token for user: " + tokenId.getUser());
}
boolean validToken = caClient.verifySignature(tokenId.getBytes(),
token.getPassword(), singerCert);
if (!validToken) {
throw new BlockTokenException("Invalid block token for user: " +
tokenId.getUser());
}
// check expiration
if (isExpired(tokenId.getExpiryDate())) {
UserGroupInformation tokenUser = tokenId.getUser();

View File

@ -54,8 +54,17 @@ public interface CertificateClient {
/**
* Returns the certificate of the specified component if it exists on the
* local system.
* @param certSerialId
*
* @return certificate or Null if there is no data.
*/
X509Certificate getCertificate(String certSerialId)
throws CertificateException;
/**
* Returns the certificate of the specified component if it exists on the
* local system.
*
* @return certificate or Null if there is no data.
*/
X509Certificate getCertificate();
@ -121,13 +130,15 @@ boolean verifySignature(byte[] data, byte[] signature,
X509Certificate queryCertificate(String query);
/**
* Stores the Certificate.
* Stores the Certificate for this client. Don't use this api to add
* trusted certificates of others.
*
* @param certificate - X509 Certificate
* @param pemEncodedCert - pem encoded X509 Certificate
* @param force - override any existing file
* @throws CertificateException - on Error.
*
*/
void storeCertificate(X509Certificate certificate)
void storeCertificate(String pemEncodedCert, boolean force)
throws CertificateException;
/**

View File

@ -32,8 +32,13 @@ public class DNCertificateClient extends DefaultCertificateClient {
private static final Logger LOG =
LoggerFactory.getLogger(DNCertificateClient.class);
public DNCertificateClient(SecurityConfig securityConfig,
String certSerialId) {
super(securityConfig, LOG, certSerialId);
}
public DNCertificateClient(SecurityConfig securityConfig) {
super(securityConfig, LOG);
super(securityConfig, LOG, null);
}
/**

View File

@ -28,13 +28,26 @@
import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.bouncycastle.cert.X509CertificateHolder;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@ -47,11 +60,12 @@
import java.security.Signature;
import java.security.SignatureException;
import java.security.cert.CertStore;
import java.security.cert.CertificateEncodingException;
import java.security.cert.X509Certificate;
import java.security.spec.InvalidKeySpecException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.FAILURE;
import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.GETCERT;
@ -65,24 +79,75 @@
*/
public abstract class DefaultCertificateClient implements CertificateClient {
private static final String CERT_FILE_NAME_FORMAT = "%s.crt";
private final Logger logger;
private final SecurityConfig securityConfig;
private final KeyCodec keyCodec;
private PrivateKey privateKey;
private PublicKey publicKey;
private X509Certificate x509Certificate;
private Map<String, X509Certificate> certificateMap;
private String certSerialId;
DefaultCertificateClient(SecurityConfig securityConfig, Logger log) {
DefaultCertificateClient(SecurityConfig securityConfig, Logger log,
String certSerialId) {
Objects.requireNonNull(securityConfig);
this.securityConfig = securityConfig;
keyCodec = new KeyCodec(securityConfig);
this.logger = log;
this.certificateMap = new ConcurrentHashMap<>();
this.certSerialId = certSerialId;
loadAllCertificates();
}
/**
* Returns the private key of the specified component if it exists on the
* local system.
* Load all certificates from configured location.
* */
private void loadAllCertificates() {
// See if certs directory exists in file system.
Path certPath = securityConfig.getCertificateLocation();
if (Files.exists(certPath) && Files.isDirectory(certPath)) {
getLogger().info("Loading certificate from location:{}.",
certPath);
File[] certFiles = certPath.toFile().listFiles();
if (certFiles != null) {
CertificateCodec certificateCodec =
new CertificateCodec(securityConfig);
for (File file : certFiles) {
if (file.isFile()) {
try {
X509CertificateHolder x509CertificateHolder = certificateCodec
.readCertificate(certPath, file.getName());
X509Certificate cert =
CertificateCodec.getX509Certificate(x509CertificateHolder);
if (cert != null && cert.getSerialNumber() != null) {
if (cert.getSerialNumber().toString().equals(certSerialId)) {
x509Certificate = cert;
}
certificateMap.putIfAbsent(cert.getSerialNumber().toString(),
cert);
getLogger().info("Added certificate from file:{}.",
file.getAbsolutePath());
} else {
getLogger().error("Error reading certificate from file:{}",
file);
}
} catch (java.security.cert.CertificateException | IOException e) {
getLogger().error("Error reading certificate from file:{}.",
file.getAbsolutePath(), e);
}
}
}
}
}
}
/**
* Returns the private key of the specified if it exists on the local
* system.
*
* @return private key or Null if there is no data.
*/
@ -106,8 +171,7 @@ public PrivateKey getPrivateKey() {
}
/**
* Returns the public key of the specified component if it exists on the
* local system.
* Returns the public key of the specified if it exists on the local system.
*
* @return public key or Null if there is no data.
*/
@ -131,34 +195,72 @@ public PublicKey getPublicKey() {
}
/**
* Returns the certificate of the specified component if it exists on the
* local system.
* Returns the default certificate of given client if it exists.
*
* @return certificate or Null if there is no data.
*/
@Override
public X509Certificate getCertificate() {
if(x509Certificate != null){
if (x509Certificate != null) {
return x509Certificate;
}
Path certPath = securityConfig.getCertificateLocation();
if (OzoneSecurityUtil.checkIfFileExist(certPath,
securityConfig.getCertificateFileName())) {
CertificateCodec certificateCodec =
new CertificateCodec(securityConfig);
try {
X509CertificateHolder x509CertificateHolder =
certificateCodec.readCertificate();
x509Certificate =
CertificateCodec.getX509Certificate(x509CertificateHolder);
} catch (java.security.cert.CertificateException | IOException e) {
getLogger().error("Error reading certificate.", e);
}
if (certSerialId == null) {
getLogger().error("Default certificate serial id is not set. Can't " +
"locate the default certificate for this client.");
return null;
}
// Refresh the cache from file system.
loadAllCertificates();
if (certificateMap.containsKey(certSerialId)) {
x509Certificate = certificateMap.get(certSerialId);
}
return x509Certificate;
}
/**
* Returns the certificate with the specified certificate serial id if it
* exists else try to get it from SCM.
* @param certId
*
* @return certificate or Null if there is no data.
*/
@Override
public X509Certificate getCertificate(String certId)
throws CertificateException {
// Check if it is in cache.
if (certificateMap.containsKey(certId)) {
return certificateMap.get(certId);
}
// Try to get it from SCM.
return this.getCertificateFromScm(certId);
}
/**
* Get certificate from SCM and store it in local file system.
* @param certId
* @return certificate
*/
private X509Certificate getCertificateFromScm(String certId)
throws CertificateException {
getLogger().info("Getting certificate with certSerialId:{}.",
certId);
try {
SCMSecurityProtocol scmSecurityProtocolClient = getScmSecurityClient(
(OzoneConfiguration) securityConfig.getConfiguration());
String pemEncodedCert =
scmSecurityProtocolClient.getCertificate(certId);
this.storeCertificate(pemEncodedCert, true);
return CertificateCodec.getX509Certificate(pemEncodedCert);
} catch (Exception e) {
getLogger().error("Error while getting Certificate with " +
"certSerialId:{} from scm.", certId, e);
throw new CertificateException("Error while getting certificate for " +
"certSerialId:" + certId, e, CERTIFICATE_ERROR);
}
}
/**
* Verifies if this certificate is part of a trusted chain.
*
@ -171,8 +273,7 @@ public boolean verifyCertificate(X509Certificate certificate) {
}
/**
* Creates digital signature over the data stream using the components
* private key.
* Creates digital signature over the data stream using the s private key.
*
* @param stream - Data stream to sign.
* @throws CertificateException - on Error.
@ -200,10 +301,9 @@ public byte[] signDataStream(InputStream stream)
}
/**
* Creates digital signature over the data stream using the components
* private key.
* Creates digital signature over the data stream using the s private key.
*
* @param data - Data to sign.
* @param data - Data to sign.
* @throws CertificateException - on Error.
*/
@Override
@ -349,29 +449,39 @@ public X509Certificate queryCertificate(String query) {
}
/**
* Stores the Certificate for this client. Don't use this api to add
* trusted certificates of other components.
* Stores the Certificate for this client. Don't use this api to add trusted
* certificates of others.
*
* @param certificate - X509 Certificate
* @param pemEncodedCert - pem encoded X509 Certificate
* @param force - override any existing file
* @throws CertificateException - on Error.
*
*/
@Override
public void storeCertificate(X509Certificate certificate)
public void storeCertificate(String pemEncodedCert, boolean force)
throws CertificateException {
CertificateCodec certificateCodec = new CertificateCodec(securityConfig);
try {
certificateCodec.writeCertificate(
new X509CertificateHolder(certificate.getEncoded()));
} catch (IOException | CertificateEncodingException e) {
Path basePath = securityConfig.getCertificateLocation();
X509Certificate cert =
CertificateCodec.getX509Certificate(pemEncodedCert);
String certName = String.format(CERT_FILE_NAME_FORMAT,
cert.getSerialNumber().toString());
certificateCodec.writeCertificate(basePath, certName,
pemEncodedCert, force);
certificateMap.putIfAbsent(cert.getSerialNumber().toString(), cert);
} catch (IOException | java.security.cert.CertificateException e) {
throw new CertificateException("Error while storing certificate.", e,
CERTIFICATE_ERROR);
}
}
/**
* Stores the trusted chain of certificates for a specific component.
* Stores the trusted chain of certificates for a specific .
*
* @param ks - Key Store.
* @param ks - Key Store.
* @throws CertificateException - on Error.
*/
@Override
@ -382,7 +492,7 @@ public synchronized void storeTrustChain(CertStore ks)
/**
* Stores the trusted chain of certificates for a specific component.
* Stores the trusted chain of certificates for a specific .
*
* @param certificates - List of Certificates.
* @throws CertificateException - on Error.
@ -640,4 +750,26 @@ protected KeyPair createKeyPair() throws CertificateException {
public Logger getLogger() {
return logger;
}
/**
* Create a scm security client, used to get SCM signed certificate.
*
* @return {@link SCMSecurityProtocol}
*/
private static SCMSecurityProtocol getScmSecurityClient(
OzoneConfiguration conf) throws IOException {
RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
ProtobufRpcEngine.class);
long scmVersion =
RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
InetSocketAddress scmSecurityProtoAdd =
HddsUtils.getScmAddressForSecurityProtocol(conf);
SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
new SCMSecurityProtocolClientSideTranslatorPB(
RPC.getProxy(SCMSecurityProtocolPB.class, scmVersion,
scmSecurityProtoAdd, UserGroupInformation.getCurrentUser(),
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
return scmSecurityClient;
}
}

View File

@ -39,8 +39,13 @@ public class OMCertificateClient extends DefaultCertificateClient {
private static final Logger LOG =
LoggerFactory.getLogger(OMCertificateClient.class);
public OMCertificateClient(SecurityConfig securityConfig,
String certSerialId) {
super(securityConfig, LOG, certSerialId);
}
public OMCertificateClient(SecurityConfig securityConfig) {
super(securityConfig, LOG);
super(securityConfig, LOG, null);
}
protected InitResponse handleCase(InitCase init) throws

View File

@ -83,6 +83,7 @@ public enum ErrorCode {
CERTIFICATE_ERROR,
BOOTSTRAP_ERROR,
CSR_ERROR,
CRYPTO_SIGNATURE_VERIFICATION_ERROR
CRYPTO_SIGNATURE_VERIFICATION_ERROR,
CERTIFICATE_NOT_FOUND_ERROR
}
}

View File

@ -245,5 +245,17 @@ public void initialize() throws IOException {
storageInfo.writeTo(getVersionFile());
}
/**
* Persists current StorageInfo to file system..
* @throws IOException
*/
public void persistCurrentState() throws IOException {
if (!getCurrentDir().exists()) {
throw new IOException("Metadata dir doesn't exist, dir: " +
getCurrentDir());
}
storageInfo.writeTo(getVersionFile());
}
}

View File

@ -34,6 +34,7 @@ public class CodecRegistry {
public CodecRegistry() {
valueCodecs = new HashMap<>();
valueCodecs.put(String.class, new StringCodec());
valueCodecs.put(Long.class, new LongCodec());
}
/**

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.utils.db;
import com.google.common.primitives.Longs;
/**
* Codec to convert Long to/from byte array.
*/
public class LongCodec implements Codec<Long> {
@Override
public byte[] toPersistedFormat(Long object) {
if (object != null) {
return Longs.toByteArray(object);
} else {
return null;
}
}
@Override
public Long fromPersistedFormat(byte[] rawData) {
if (rawData != null) {
return Longs.fromByteArray(rawData);
} else {
return null;
}
}
}

View File

@ -33,6 +33,7 @@ message DatanodeDetailsProto {
required string ipAddress = 2; // IP address
required string hostName = 3; // hostname
repeated Port ports = 4;
optional string certSerialId = 5; // Certificate serial id.
}
/**

View File

@ -59,12 +59,15 @@
@SuppressWarnings("visibilitymodifier")
public class TestCertificateClientInit {
private KeyPair keyPair;
private String certSerialId = "3284792342234";
private CertificateClient dnCertificateClient;
private CertificateClient omCertificateClient;
private HDDSKeyGenerator keyGenerator;
private Path metaDirPath;
private SecurityConfig securityConfig;
private KeyCodec keyCodec;
private X509Certificate x509Certificate;
@Parameter
public boolean pvtKeyPresent;
@ -96,10 +99,16 @@ public void setUp() throws Exception {
metaDirPath = Paths.get(path, "test");
config.set(HDDS_METADATA_DIR_NAME, metaDirPath.toString());
securityConfig = new SecurityConfig(config);
dnCertificateClient = new DNCertificateClient(securityConfig);
omCertificateClient = new OMCertificateClient(securityConfig);
keyGenerator = new HDDSKeyGenerator(securityConfig);
keyPair = keyGenerator.generateKey();
x509Certificate = getX509Certificate();
certSerialId = x509Certificate.getSerialNumber().toString();
dnCertificateClient = new DNCertificateClient(securityConfig,
certSerialId);
omCertificateClient = new OMCertificateClient(securityConfig,
certSerialId);
keyCodec = new KeyCodec(securityConfig);
Files.createDirectories(securityConfig.getKeyLocation());
}
@ -113,7 +122,6 @@ public void tearDown() {
@Test
public void testInitDatanode() throws Exception {
KeyPair keyPair = keyGenerator.generateKey();
if (pvtKeyPresent) {
keyCodec.writePrivateKey(keyPair.getPrivate());
} else {
@ -131,9 +139,6 @@ public void testInitDatanode() throws Exception {
}
if (certPresent) {
X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
"CN=Test", keyPair, 10, securityConfig.getSignatureAlgo());
CertificateCodec codec = new CertificateCodec(securityConfig);
codec.writeCertificate(new X509CertificateHolder(
x509Certificate.getEncoded()));
@ -157,7 +162,6 @@ public void testInitDatanode() throws Exception {
@Test
public void testInitOzoneManager() throws Exception {
KeyPair keyPair = keyGenerator.generateKey();
if (pvtKeyPresent) {
keyCodec.writePrivateKey(keyPair.getPrivate());
} else {
@ -175,9 +179,6 @@ public void testInitOzoneManager() throws Exception {
}
if (certPresent) {
X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
"CN=Test", keyPair, 10, securityConfig.getSignatureAlgo());
CertificateCodec codec = new CertificateCodec(securityConfig);
codec.writeCertificate(new X509CertificateHolder(
x509Certificate.getEncoded()));
@ -202,4 +203,9 @@ public void testInitOzoneManager() throws Exception {
securityConfig.getPublicKeyFileName()));
}
}
private X509Certificate getX509Certificate() throws Exception {
return KeyStoreTestUtil.generateCertificate(
"CN=Test", keyPair, 10, securityConfig.getSignatureAlgo());
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.security.x509.certificate.client;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
import org.bouncycastle.cert.X509CertificateHolder;
import org.junit.After;
@ -49,8 +50,11 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_METADATA_DIR_NAME;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.FAILURE;
import static org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getPEMEncodedString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -62,37 +66,60 @@
*/
public class TestDefaultCertificateClient {
private String certSerialId;
private X509Certificate x509Certificate;
private OMCertificateClient omCertClient;
private DNCertificateClient dnCertClient;
private HDDSKeyGenerator keyGenerator;
private Path metaDirPath;
private SecurityConfig securityConfig;
private Path omMetaDirPath;
private Path dnMetaDirPath;
private SecurityConfig omSecurityConfig;
private SecurityConfig dnSecurityConfig;
private final static String UTF = "UTF-8";
private KeyCodec keyCodec;
private KeyCodec omKeyCodec;
private KeyCodec dnKeyCodec;
@Before
public void setUp() throws Exception {
OzoneConfiguration config = new OzoneConfiguration();
final String path = GenericTestUtils
config.setStrings(OZONE_SCM_NAMES, "localhost");
config.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
final String omPath = GenericTestUtils
.getTempPath(UUID.randomUUID().toString());
metaDirPath = Paths.get(path, "test");
config.set(HDDS_METADATA_DIR_NAME, metaDirPath.toString());
securityConfig = new SecurityConfig(config);
final String dnPath = GenericTestUtils
.getTempPath(UUID.randomUUID().toString());
omMetaDirPath = Paths.get(omPath, "test");
dnMetaDirPath = Paths.get(dnPath, "test");
config.set(HDDS_METADATA_DIR_NAME, omMetaDirPath.toString());
omSecurityConfig = new SecurityConfig(config);
config.set(HDDS_METADATA_DIR_NAME, dnMetaDirPath.toString());
dnSecurityConfig = new SecurityConfig(config);
keyGenerator = new HDDSKeyGenerator(omSecurityConfig);
omKeyCodec = new KeyCodec(omSecurityConfig);
dnKeyCodec = new KeyCodec(dnSecurityConfig);
Files.createDirectories(omSecurityConfig.getKeyLocation());
Files.createDirectories(dnSecurityConfig.getKeyLocation());
x509Certificate = generateX509Cert(null);
certSerialId = x509Certificate.getSerialNumber().toString();
getCertClient();
keyGenerator = new HDDSKeyGenerator(securityConfig);
keyCodec = new KeyCodec(securityConfig);
Files.createDirectories(securityConfig.getKeyLocation());
}
private void getCertClient() {
omCertClient = new OMCertificateClient(securityConfig);
dnCertClient = new DNCertificateClient(securityConfig);
omCertClient = new OMCertificateClient(omSecurityConfig, certSerialId);
dnCertClient = new DNCertificateClient(dnSecurityConfig, certSerialId);
}
@After
public void tearDown() {
omCertClient = null;
FileUtils.deleteQuietly(metaDirPath.toFile());
dnCertClient = null;
FileUtils.deleteQuietly(omMetaDirPath.toFile());
FileUtils.deleteQuietly(dnMetaDirPath.toFile());
}
/**
@ -101,6 +128,7 @@ public void tearDown() {
*/
@Test
public void testKeyOperations() throws Exception {
cleanupOldKeyPair();
PrivateKey pvtKey = omCertClient.getPrivateKey();
PublicKey publicKey = omCertClient.getPublicKey();
assertNull(publicKey);
@ -111,18 +139,33 @@ public void testKeyOperations() throws Exception {
assertNotNull(pvtKey);
assertEquals(pvtKey, keyPair.getPrivate());
publicKey = omCertClient.getPublicKey();
publicKey = dnCertClient.getPublicKey();
assertNotNull(publicKey);
assertEquals(publicKey, keyPair.getPublic());
}
private KeyPair generateKeyPairFiles() throws Exception {
cleanupOldKeyPair();
KeyPair keyPair = keyGenerator.generateKey();
keyCodec.writePrivateKey(keyPair.getPrivate());
keyCodec.writePublicKey(keyPair.getPublic());
omKeyCodec.writePrivateKey(keyPair.getPrivate());
omKeyCodec.writePublicKey(keyPair.getPublic());
dnKeyCodec.writePrivateKey(keyPair.getPrivate());
dnKeyCodec.writePublicKey(keyPair.getPublic());
return keyPair;
}
private void cleanupOldKeyPair() {
FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
.toString(), omSecurityConfig.getPrivateKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
.toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
.toString(), dnSecurityConfig.getPrivateKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
.toString(), dnSecurityConfig.getPublicKeyFileName()).toFile());
}
/**
* Tests: 1. storeCertificate 2. getCertificate 3. verifyCertificate
*/
@ -130,11 +173,11 @@ private KeyPair generateKeyPairFiles() throws Exception {
public void testCertificateOps() throws Exception {
X509Certificate cert = omCertClient.getCertificate();
assertNull(cert);
omCertClient.storeCertificate(getPEMEncodedString(x509Certificate),
true);
X509Certificate x509Certificate = generateX509Cert(null);
omCertClient.storeCertificate(x509Certificate);
cert = omCertClient.getCertificate();
cert = omCertClient.getCertificate(
x509Certificate.getSerialNumber().toString());
assertNotNull(cert);
assertTrue(cert.getEncoded().length > 0);
assertEquals(cert, x509Certificate);
@ -147,12 +190,17 @@ private X509Certificate generateX509Cert(KeyPair keyPair) throws Exception {
keyPair = generateKeyPairFiles();
}
return KeyStoreTestUtil.generateCertificate("CN=Test", keyPair, 30,
securityConfig.getSignatureAlgo());
omSecurityConfig.getSignatureAlgo());
}
@Test
public void testSignDataStream() throws Exception {
String data = RandomStringUtils.random(100, UTF);
FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
.toString(), omSecurityConfig.getPrivateKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
.toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
// Expect error when there is no private key to sign.
LambdaTestUtils.intercept(IOException.class, "Error while " +
"signing the stream",
@ -171,8 +219,8 @@ public void testSignDataStream() throws Exception {
private void validateHash(byte[] hash, byte[] data)
throws Exception {
Signature rsaSignature =
Signature.getInstance(securityConfig.getSignatureAlgo(),
securityConfig.getProvider());
Signature.getInstance(omSecurityConfig.getSignatureAlgo(),
omSecurityConfig.getProvider());
rsaSignature.initVerify(omCertClient.getPublicKey());
rsaSignature.update(data);
Assert.assertTrue(rsaSignature.verify(hash));
@ -184,8 +232,6 @@ private void validateHash(byte[] hash, byte[] data)
@Test
public void verifySignatureStream() throws Exception {
String data = RandomStringUtils.random(500, UTF);
X509Certificate x509Certificate = generateX509Cert(null);
byte[] sign = omCertClient.signDataStream(IOUtils.toInputStream(data,
UTF));
@ -209,7 +255,6 @@ public void verifySignatureStream() throws Exception {
@Test
public void verifySignatureDataArray() throws Exception {
String data = RandomStringUtils.random(500, UTF);
X509Certificate x509Certificate = generateX509Cert(null);
byte[] sign = omCertClient.signData(data.getBytes());
// Positive tests.
@ -233,6 +278,67 @@ public void queryCertificate() throws Exception {
() -> omCertClient.queryCertificate(""));
}
@Test
public void testCertificateLoadingOnInit() throws Exception {
KeyPair keyPair = keyGenerator.generateKey();
X509Certificate cert1 = generateX509Cert(keyPair);
X509Certificate cert2 = generateX509Cert(keyPair);
X509Certificate cert3 = generateX509Cert(keyPair);
Path certPath = dnSecurityConfig.getCertificateLocation();
CertificateCodec codec = new CertificateCodec(dnSecurityConfig);
// Certificate not found.
LambdaTestUtils.intercept(CertificateException.class, "Error while" +
" getting certificate",
() -> dnCertClient.getCertificate(cert1.getSerialNumber()
.toString()));
LambdaTestUtils.intercept(CertificateException.class, "Error while" +
" getting certificate",
() -> dnCertClient.getCertificate(cert2.getSerialNumber()
.toString()));
LambdaTestUtils.intercept(CertificateException.class, "Error while" +
" getting certificate",
() -> dnCertClient.getCertificate(cert3.getSerialNumber()
.toString()));
codec.writeCertificate(certPath, "1.crt",
getPEMEncodedString(cert1), true);
codec.writeCertificate(certPath, "2.crt",
getPEMEncodedString(cert2), true);
codec.writeCertificate(certPath, "3.crt",
getPEMEncodedString(cert3), true);
// Re instentiate DN client which will load certificates from filesystem.
dnCertClient = new DNCertificateClient(dnSecurityConfig, certSerialId);
assertNotNull(dnCertClient.getCertificate(cert1.getSerialNumber()
.toString()));
assertNotNull(dnCertClient.getCertificate(cert2.getSerialNumber()
.toString()));
assertNotNull(dnCertClient.getCertificate(cert3.getSerialNumber()
.toString()));
}
@Test
public void testStoreCertificate() throws Exception {
KeyPair keyPair = keyGenerator.generateKey();
X509Certificate cert1 = generateX509Cert(keyPair);
X509Certificate cert2 = generateX509Cert(keyPair);
X509Certificate cert3 = generateX509Cert(keyPair);
dnCertClient.storeCertificate(getPEMEncodedString(cert1), true);
dnCertClient.storeCertificate(getPEMEncodedString(cert2), true);
dnCertClient.storeCertificate(getPEMEncodedString(cert3), true);
assertNotNull(dnCertClient.getCertificate(cert1.getSerialNumber()
.toString()));
assertNotNull(dnCertClient.getCertificate(cert2.getSerialNumber()
.toString()));
assertNotNull(dnCertClient.getCertificate(cert3.getSerialNumber()
.toString()));
}
@Test
public void testInitCertAndKeypairValidationFailures() throws Exception {
@ -246,13 +352,23 @@ public void testInitCertAndKeypairValidationFailures() throws Exception {
omClientLog.clearOutput();
// Case 1. Expect failure when keypair validation fails.
FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
.toString(), securityConfig.getPrivateKeyFileName()).toFile());
keyCodec.writePrivateKey(keyPair.getPrivate());
FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
.toString(), omSecurityConfig.getPrivateKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
.toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
.toString(), dnSecurityConfig.getPrivateKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
.toString(), dnSecurityConfig.getPublicKeyFileName()).toFile());
omKeyCodec.writePrivateKey(keyPair.getPrivate());
omKeyCodec.writePublicKey(keyPair2.getPublic());
dnKeyCodec.writePrivateKey(keyPair.getPrivate());
dnKeyCodec.writePublicKey(keyPair2.getPublic());
FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
.toString(), securityConfig.getPublicKeyFileName()).toFile());
keyCodec.writePublicKey(keyPair2.getPublic());
// Check for DN.
assertEquals(dnCertClient.init(), FAILURE);
@ -271,15 +387,18 @@ public void testInitCertAndKeypairValidationFailures() throws Exception {
// Case 2. Expect failure when certificate is generated from different
// private key and keypair validation fails.
getCertClient();
FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
.toString(), securityConfig.getCertificateFileName()).toFile());
X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
"CN=Test", keyGenerator.generateKey(), 10,
securityConfig.getSignatureAlgo());
CertificateCodec codec = new CertificateCodec(securityConfig);
codec.writeCertificate(new X509CertificateHolder(
FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
.toString(), omSecurityConfig.getCertificateFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
.toString(), dnSecurityConfig.getCertificateFileName()).toFile());
CertificateCodec omCertCodec = new CertificateCodec(omSecurityConfig);
omCertCodec.writeCertificate(new X509CertificateHolder(
x509Certificate.getEncoded()));
CertificateCodec dnCertCodec = new CertificateCodec(dnSecurityConfig);
dnCertCodec.writeCertificate(new X509CertificateHolder(
x509Certificate.getEncoded()));
// Check for DN.
assertEquals(dnCertClient.init(), FAILURE);
assertTrue(dnClientLog.getOutput().contains("Keypair validation " +
@ -297,10 +416,13 @@ public void testInitCertAndKeypairValidationFailures() throws Exception {
// private key and certificate validation fails.
// Re write the correct public key.
FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
.toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
.toString(), dnSecurityConfig.getPublicKeyFileName()).toFile());
getCertClient();
FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
.toString(), securityConfig.getPublicKeyFileName()).toFile());
keyCodec.writePublicKey(keyPair.getPublic());
omKeyCodec.writePublicKey(keyPair.getPublic());
dnKeyCodec.writePublicKey(keyPair.getPublic());
// Check for DN.
assertEquals(dnCertClient.init(), FAILURE);
@ -318,8 +440,10 @@ public void testInitCertAndKeypairValidationFailures() throws Exception {
// Case 4. Failure when public key recovery fails.
getCertClient();
FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
.toString(), securityConfig.getPublicKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
.toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
.toString(), dnSecurityConfig.getPublicKeyFileName()).toFile());
// Check for DN.
assertEquals(dnCertClient.init(), FAILURE);

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -53,10 +52,10 @@
import java.net.InetAddress;
import java.security.KeyPair;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getX509Certificate;
import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
@ -179,7 +178,8 @@ public void start(Object service) {
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
component = "dn-" + datanodeDetails.getUuidString();
dnCertClient = new DNCertificateClient(new SecurityConfig(conf));
dnCertClient = new DNCertificateClient(new SecurityConfig(conf),
datanodeDetails.getCertSerialId());
if (SecurityUtil.getAuthenticationMethod(conf).equals(
UserGroupInformation.AuthenticationMethod.KERBEROS)) {
@ -199,7 +199,11 @@ public void start(Object service) {
}
LOG.info("Hdds Datanode login successful.");
}
datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf);
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
initializeCertificateClient(conf);
}
datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
dnCertClient);
try {
httpServer = new HddsDatanodeHttpServer(conf);
httpServer.start();
@ -209,9 +213,6 @@ public void start(Object service) {
startPlugins();
// Starting HDDS Daemons
datanodeStateMachine.startDaemon();
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
initializeCertificateClient(conf);
}
} catch (IOException e) {
throw new RuntimeException("Can't start the HDDS datanode plugin", e);
} catch (AuthenticationException ex) {
@ -268,10 +269,10 @@ private void getSCMSignedCert(OzoneConfiguration config) {
String pemEncodedCert = secureScmClient.getDataNodeCertificate(
datanodeDetails.getProtoBufMessage(), getEncodedString(csr));
X509Certificate x509Certificate =
CertificateCodec.getX509Certificate(pemEncodedCert);
dnCertClient.storeCertificate(x509Certificate);
dnCertClient.storeCertificate(pemEncodedCert, true);
datanodeDetails.setCertSerialId(getX509Certificate(pemEncodedCert).
getSerialNumber().toString());
persistDatanodeDetails(datanodeDetails);
} catch (IOException | CertificateException e) {
LOG.error("Error while storing SCM signed certificate.", e);
throw new RuntimeException(e);
@ -331,6 +332,29 @@ private DatanodeDetails initializeDatanodeDetails()
}
}
/**
* Persist DatanodeDetails to file system.
* @param dnDetails
*
* @return DatanodeDetails
*/
private void persistDatanodeDetails(DatanodeDetails dnDetails)
throws IOException {
String idFilePath = HddsUtils.getDatanodeIdFilePath(conf);
if (idFilePath == null || idFilePath.isEmpty()) {
LOG.error("A valid file path is needed for config setting {}",
ScmConfigKeys.OZONE_SCM_DATANODE_ID);
throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_DATANODE_ID +
" must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration" +
" for details on configuring Ozone.");
}
Preconditions.checkNotNull(idFilePath);
File idFile = new File(idFilePath);
ContainerUtils.writeDatanodeDetailsTo(dnDetails, idFile);
}
/**
* Starts all the service plugins which are configured using
* OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY.

View File

@ -33,6 +33,7 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CloseContainerCommandHandler;
@ -82,15 +83,17 @@ public class DatanodeStateMachine implements Closeable {
private final ReplicationSupervisor supervisor;
private JvmPauseMonitor jvmPauseMonitor;
private CertificateClient dnCertClient;
/**
* Constructs a a datanode state machine.
*
* @param datanodeDetails - DatanodeDetails used to identify a datanode
* @param datanodeDetails - DatanodeDetails used to identify a datanode
* @param conf - Configuration.
* @param certClient - Datanode Certificate client, required if security is
* enabled
*/
public DatanodeStateMachine(DatanodeDetails datanodeDetails,
Configuration conf) throws IOException {
Configuration conf, CertificateClient certClient) throws IOException {
this.conf = conf;
this.datanodeDetails = datanodeDetails;
executorService = HadoopExecutors.newCachedThreadPool(
@ -99,7 +102,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
connectionManager = new SCMConnectionManager(conf);
context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
container = new OzoneContainer(this.datanodeDetails,
new OzoneConfiguration(conf), context);
new OzoneConfiguration(conf), context, certClient);
dnCertClient = certClient;
nextHB = new AtomicLong(Time.monotonicNow());
ContainerReplicator replicator =

View File

@ -32,6 +32,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.MISSING_BLOCK_TOKEN;
/**
* A server endpoint that acts as the communication layer for Ozone containers.
*/
@ -39,10 +41,12 @@ public abstract class XceiverServer implements XceiverServerSpi {
private final SecurityConfig secConfig;
private final TokenVerifier tokenVerifier;
private final CertificateClient caClient;
public XceiverServer(Configuration conf) {
public XceiverServer(Configuration conf, CertificateClient client) {
Preconditions.checkNotNull(conf);
this.secConfig = new SecurityConfig(conf);
this.caClient = client;
tokenVerifier = new BlockTokenVerifier(secConfig, getCaClient());
}
@ -59,17 +63,15 @@ public void submitRequest(ContainerCommandRequestProto request,
String encodedToken = request.getEncodedToken();
if (encodedToken == null) {
throw new SCMSecurityException("Security is enabled but client " +
"request is missing block token.",
SCMSecurityException.ErrorCode.MISSING_BLOCK_TOKEN);
"request is missing block token.", MISSING_BLOCK_TOKEN);
}
tokenVerifier.verify(encodedToken, "");
tokenVerifier.verify(encodedToken, encodedToken);
}
}
@VisibleForTesting
protected CertificateClient getCaClient() {
// TODO: instantiate CertificateClient
return null;
return caClient;
}
protected SecurityConfig getSecurityConfig() {

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.container.common.helpers.
StorageContainerException;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -75,8 +76,9 @@ public final class XceiverServerGrpc extends XceiverServer {
* @param conf - Configuration
*/
public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
ContainerDispatcher dispatcher, BindableService... additionalServices) {
super(conf);
ContainerDispatcher dispatcher, CertificateClient caClient,
BindableService... additionalServices) {
super(conf, caClient);
Preconditions.checkNotNull(conf);
this.id = datanodeDetails.getUuid();

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@ -113,9 +114,9 @@ private static long nextCallId() {
private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext
context, GrpcTlsConfig tlsConfig)
context, GrpcTlsConfig tlsConfig, CertificateClient caClient)
throws IOException {
super(conf);
super(conf, caClient);
Objects.requireNonNull(dd, "id == null");
this.port = port;
RaftProperties serverProperties = newRaftProperties(conf);
@ -380,7 +381,8 @@ private RpcType setRpcType(Configuration conf, RaftProperties properties) {
public static XceiverServerRatis newXceiverServerRatis(
DatanodeDetails datanodeDetails, Configuration ozoneConf,
ContainerDispatcher dispatcher, StateContext context) throws IOException {
ContainerDispatcher dispatcher, StateContext context,
CertificateClient caClient) throws IOException {
int localPort = ozoneConf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
@ -406,7 +408,7 @@ public static XceiverServerRatis newXceiverServerRatis(
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
return new XceiverServerRatis(datanodeDetails, localPort,
dispatcher, ozoneConf, context, tlsConfig);
dispatcher, ozoneConf, context, tlsConfig, caClient);
}
@Override

View File

@ -29,6 +29,7 @@
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
@ -76,11 +77,13 @@ public class OzoneContainer {
* Construct OzoneContainer object.
* @param datanodeDetails
* @param conf
* @param certClient
* @throws DiskOutOfSpaceException
* @throws IOException
*/
public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
conf, StateContext context) throws IOException {
conf, StateContext context, CertificateClient certClient)
throws IOException {
this.config = conf;
this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
this.containerSet = new ContainerSet();
@ -104,9 +107,10 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
*/
this.controller = new ContainerController(containerSet, handlers);
this.writeChannel = XceiverServerRatis.newXceiverServerRatis(
datanodeDetails, config, hddsDispatcher, context);
datanodeDetails, config, hddsDispatcher, context, certClient);
this.readChannel = new XceiverServerGrpc(
datanodeDetails, config, hddsDispatcher, createReplicationService());
datanodeDetails, config, hddsDispatcher, certClient,
createReplicationService());
}

View File

@ -73,6 +73,7 @@ public static void setUp() throws Exception {
conf = new OzoneConfiguration();
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath());
//conf.set(ScmConfigKeys.OZONE_SCM_NAMES, "localhost");
String volumeDir = testDir + "/disk1";
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, volumeDir);
@ -113,8 +114,7 @@ public static void tearDown() {
@Before
public void setUpDNCertClient(){
client = new DNCertificateClient(securityConfig);
service.setCertificateClient(client);
FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
.toString(), securityConfig.getPrivateKeyFileName()).toFile());
FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
@ -123,7 +123,9 @@ public void setUpDNCertClient(){
.getCertificateLocation().toString(),
securityConfig.getCertificateFileName()).toFile());
dnLogs.clearOutput();
client = new DNCertificateClient(securityConfig,
certHolder.getSerialNumber().toString());
service.setCertificateClient(client);
}
@Test

View File

@ -161,7 +161,7 @@ public void tearDown() throws Exception {
public void testStartStopDatanodeStateMachine() throws IOException,
InterruptedException, TimeoutException {
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(getNewDatanodeDetails(), conf)) {
new DatanodeStateMachine(getNewDatanodeDetails(), conf, null)) {
stateMachine.startDaemon();
SCMConnectionManager connectionManager =
stateMachine.getConnectionManager();
@ -219,7 +219,7 @@ public void testDatanodeStateContext() throws IOException,
ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf)) {
new DatanodeStateMachine(datanodeDetails, conf, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@ -325,7 +325,7 @@ public void testDatanodeStateMachineWithIdWriteFail() throws Exception {
datanodeDetails.setPort(port);
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf)) {
new DatanodeStateMachine(datanodeDetails, conf, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@ -388,7 +388,7 @@ public void testDatanodeStateMachineWithInvalidConfiguration()
perTestConf.setStrings(entry.getKey(), entry.getValue());
LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
getNewDatanodeDetails(), perTestConf)) {
getNewDatanodeDetails(), perTestConf, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,

View File

@ -276,7 +276,7 @@ private OzoneContainer getOzoneContainer(final OzoneConfiguration conf,
.thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(datanodeStateMachine);
final OzoneContainer ozoneContainer = new OzoneContainer(
datanodeDetails, conf, context);
datanodeDetails, conf, context, null);
ozoneContainer.getDispatcher().setScmId(UUID.randomUUID().toString());
return ozoneContainer;
}

View File

@ -98,7 +98,7 @@ public void testBuildContainerMap() throws Exception {
// When OzoneContainer is started, the containers from disk should be
// loaded into the containerSet.
OzoneContainer ozoneContainer = new
OzoneContainer(datanodeDetails, conf, context);
OzoneContainer(datanodeDetails, conf, context, null);
ContainerSet containerset = ozoneContainer.getContainerSet();
assertEquals(10, containerset.containerCount());
}

View File

@ -156,7 +156,7 @@ public void testGetVersionTask() throws Exception {
serverAddress, 1000)) {
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(
datanodeDetails, conf, getContext(datanodeDetails));
datanodeDetails, conf, getContext(datanodeDetails), null);
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
@ -181,7 +181,7 @@ public void testCheckVersionResponse() throws Exception {
.captureLogs(VersionEndpointTask.LOG);
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(
datanodeDetails, conf, getContext(datanodeDetails));
datanodeDetails, conf, getContext(datanodeDetails), null);
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
@ -235,7 +235,7 @@ public void testGetVersionToInvalidEndpoint() throws Exception {
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(
datanodeDetails, conf, getContext(datanodeDetails));
datanodeDetails, conf, getContext(datanodeDetails), null);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
EndpointStateMachine.EndPointStates newState = versionTask.call();
@ -263,7 +263,7 @@ public void testGetVersionAssertRpcTimeOut() throws Exception {
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(
datanodeDetails, conf, getContext(datanodeDetails));
datanodeDetails, conf, getContext(datanodeDetails), null);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
@ -483,7 +483,7 @@ private StateContext heartbeatTaskHelper(InetSocketAddress scmAddress,
// Create a datanode state machine for stateConext used by endpoint task
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), conf);
TestUtils.randomDatanodeDetails(), conf, null);
EndpointStateMachine rpcEndPoint =
createEndpoint(conf, scmAddress, rpcTimeout)) {
HddsProtos.DatanodeDetailsProto datanodeDetailsProto =

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.hadoop.utils.db.Table;
@ -245,6 +246,13 @@ List<OmVolumeArgs> listVolumes(String userName, String prefix,
*/
Table<String, OmKeyInfo> getOpenKeyTable();
/**
* Gets the DelegationTokenTable.
*
* @return Table.
*/
Table<OzoneTokenIdentifier, Long> getDelegationTokenTable();
/**
* Gets the S3Bucket to Ozone Volume/bucket mapping table.
*

View File

@ -40,7 +40,6 @@
public class S3SecretManagerImpl implements S3SecretManager {
private static final Logger LOG =
LoggerFactory.getLogger(S3SecretManagerImpl.class);
/**
* OMMetadataManager is used for accessing OM MetadataDB and ReadWriteLock.
*/
@ -110,4 +109,8 @@ public String getS3UserSecretString(String awsAccessKeyId)
return OzoneManagerProtocolProtos.S3Secret.parseFrom(s3Secret)
.getAwsSecret();
}
public OMMetadataManager getOmMetadataManager() {
return omMetadataManager;
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.codec;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.utils.db.Codec;
import java.io.IOException;
/**
* Codec to encode TokenIdentifierCodec as byte array.
*/
public class TokenIdentifierCodec implements Codec<OzoneTokenIdentifier> {
@Override
public byte[] toPersistedFormat(OzoneTokenIdentifier object) {
Preconditions
.checkNotNull(object, "Null object can't be converted to byte array.");
return object.getBytes();
}
@Override
public OzoneTokenIdentifier fromPersistedFormat(byte[] rawData)
throws IOException {
Preconditions.checkNotNull(rawData,
"Null byte array can't converted to real object.");
try {
return OzoneTokenIdentifier.readProtoBuf(rawData);
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
"Can't encode the the raw data from the byte array", e);
}
}
}

View File

@ -32,8 +32,7 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
/**
* SecretManager for Ozone Master block tokens.
*/
@ -172,7 +171,6 @@ public boolean verifySignature(OzoneBlockTokenIdentifier identifier,
@Override
public synchronized void start(CertificateClient client) throws IOException {
super.start(client);
removeExpiredKeys();
}
/**
@ -191,17 +189,4 @@ private long getTokenExpiryTime() {
public synchronized void stop() throws IOException {
super.stop();
}
private synchronized void removeExpiredKeys() {
// TODO: handle roll private key/certificate
long now = Time.now();
for (Iterator<Map.Entry<Integer, OzoneSecretKey>> it = allKeys.entrySet()
.iterator(); it.hasNext();) {
Map.Entry<Integer, OzoneSecretKey> e = it.next();
OzoneSecretKey key = e.getValue();
if (key.getExpiryDate() < now) {
it.remove();
}
}
}
}

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.om.S3SecretManager;
import org.apache.hadoop.ozone.om.S3SecretManagerImpl;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.security.OzoneSecretStore.OzoneManagerSecretState;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier.TokenInfo;
@ -39,7 +40,6 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.security.PrivateKey;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -61,7 +61,7 @@ public class OzoneDelegationTokenSecretManager
.getLogger(OzoneDelegationTokenSecretManager.class);
private final Map<OzoneTokenIdentifier, TokenInfo> currentTokens;
private final OzoneSecretStore store;
private final S3SecretManager s3SecretManager;
private final S3SecretManagerImpl s3SecretManager;
private Thread tokenRemoverThread;
private final long tokenRemoverScanInterval;
private String omCertificateSerialId;
@ -90,8 +90,9 @@ public OzoneDelegationTokenSecretManager(OzoneConfiguration conf,
service, LOG);
currentTokens = new ConcurrentHashMap();
this.tokenRemoverScanInterval = dtRemoverScanInterval;
this.store = new OzoneSecretStore(conf);
this.s3SecretManager = s3SecretManager;
this.s3SecretManager = (S3SecretManagerImpl) s3SecretManager;
this.store = new OzoneSecretStore(conf,
this.s3SecretManager.getOmMetadataManager());
loadTokenSecretState(store.loadState());
}
@ -129,12 +130,11 @@ public Token<OzoneTokenIdentifier> createToken(Text owner, Text renewer,
byte[] password = createPassword(identifier.getBytes(),
getCurrentKey().getPrivateKey());
addToTokenStore(identifier, password);
long expiryTime = identifier.getIssueDate() + getTokenRenewInterval();
addToTokenStore(identifier, password, expiryTime);
Token<OzoneTokenIdentifier> token = new Token<>(identifier.getBytes(),
password,
identifier.getKind(), getService());
password, identifier.getKind(), getService());
if (LOG.isTraceEnabled()) {
long expiryTime = identifier.getIssueDate() + getTokenRenewInterval();
String tokenId = identifier.toStringStable();
LOG.trace("Issued delegation token -> expiryTime:{},tokenId:{}",
expiryTime, tokenId);
@ -149,10 +149,11 @@ public Token<OzoneTokenIdentifier> createToken(Text owner, Text renewer,
* @param password
* @throws IOException
*/
private void addToTokenStore(OzoneTokenIdentifier identifier, byte[] password)
private void addToTokenStore(OzoneTokenIdentifier identifier,
byte[] password, long renewTime)
throws IOException {
TokenInfo tokenInfo = new TokenInfo(identifier.getIssueDate()
+ getTokenRenewInterval(), password, identifier.getTrackingId());
TokenInfo tokenInfo = new TokenInfo(renewTime, password,
identifier.getTrackingId());
currentTokens.put(identifier, tokenInfo);
store.storeToken(identifier, tokenInfo.getRenewDate());
}
@ -222,20 +223,10 @@ public synchronized long renewToken(Token<OzoneTokenIdentifier> token,
+ " tries to renew a token " + formatTokenId(id)
+ " with non-matching renewer " + id.getRenewer());
}
OzoneSecretKey key = allKeys.get(id.getMasterKeyId());
if (key == null) {
throw new InvalidToken("Unable to find master key for keyId="
+ id.getMasterKeyId()
+ " from cache. Failed to renew an unexpired token "
+ formatTokenId(id) + " with sequenceNumber="
+ id.getSequenceNumber());
}
byte[] password = createPassword(token.getIdentifier(),
key.getPrivateKey());
long renewTime = Math.min(id.getMaxDate(), now + getTokenRenewInterval());
try {
addToTokenStore(id, password);
addToTokenStore(id, token.getPassword(), renewTime);
} catch (IOException e) {
LOG.error("Unable to update token " + id.getSequenceNumber(), e);
}
@ -323,14 +314,8 @@ private TokenInfo validateToken(OzoneTokenIdentifier identifier)
public boolean verifySignature(OzoneTokenIdentifier identifier,
byte[] password) {
try {
if (identifier.getOmCertSerialId().equals(getOmCertificateSerialId())) {
return getCertClient().verifySignature(identifier.getBytes(), password,
getCertClient().getCertificate());
} else {
// TODO: This delegation token was issued by other OM instance. Fetch
// certificate from SCM using certificate serial.
return false;
}
return getCertClient().verifySignature(identifier.getBytes(), password,
getCertClient().getCertificate(identifier.getOmCertSerialId()));
} catch (CertificateException e) {
return false;
}
@ -367,57 +352,25 @@ private byte[] validateS3Token(OzoneTokenIdentifier identifier)
}
// TODO: handle roll private key/certificate
private synchronized void removeExpiredKeys() {
long now = Time.now();
for (Iterator<Map.Entry<Integer, OzoneSecretKey>> it = allKeys.entrySet()
.iterator(); it.hasNext();) {
Map.Entry<Integer, OzoneSecretKey> e = it.next();
OzoneSecretKey key = e.getValue();
if (key.getExpiryDate() < now && key.getExpiryDate() != -1) {
if (!key.equals(getCurrentKey())) {
it.remove();
try {
store.removeTokenMasterKey(key);
} catch (IOException ex) {
LOG.error("Unable to remove master key " + key.getKeyId(), ex);
}
}
}
}
}
private void loadTokenSecretState(
OzoneManagerSecretState<OzoneTokenIdentifier> state) throws IOException {
LOG.info("Loading token state into token manager.");
for (OzoneSecretKey key : state.ozoneManagerSecretState()) {
allKeys.putIfAbsent(key.getKeyId(), key);
incrementCurrentKeyId();
}
for (Map.Entry<OzoneTokenIdentifier, Long> entry :
state.getTokenState().entrySet()) {
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
private void addPersistedDelegationToken(
OzoneTokenIdentifier identifier, long renewDate)
throws IOException {
private void addPersistedDelegationToken(OzoneTokenIdentifier identifier,
long renewDate) throws IOException {
if (isRunning()) {
// a safety check
throw new IOException(
"Can't add persisted delegation token to a running SecretManager.");
}
int keyId = identifier.getMasterKeyId();
OzoneSecretKey dKey = allKeys.get(keyId);
if (dKey == null) {
LOG.warn("No KEY found for persisted identifier "
+ formatTokenId(identifier));
return;
}
PrivateKey privateKey = dKey.getPrivateKey();
byte[] password = createPassword(identifier.getBytes(), privateKey);
byte[] password = createPassword(identifier.getBytes(),
getCertClient().getPrivateKey());
if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) {
setDelegationTokenSeqNum(identifier.getSequenceNumber());
}
@ -437,19 +390,10 @@ private void addPersistedDelegationToken(
public synchronized void start(CertificateClient certClient)
throws IOException {
super.start(certClient);
storeKey(getCurrentKey());
removeExpiredKeys();
tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
tokenRemoverThread.start();
}
private void storeKey(OzoneSecretKey key) throws IOException {
store.storeTokenMasterKey(key);
if (!allKeys.containsKey(key.getKeyId())) {
allKeys.put(key.getKeyId(), key);
}
}
public void stopThreads() {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping expired delegation token remover thread");

View File

@ -36,8 +36,6 @@
import java.security.PrivateKey;
import java.security.Signature;
import java.security.SignatureException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -62,8 +60,6 @@ public abstract class OzoneSecretManager<T extends TokenIdentifier>
private OzoneSecretKey currentKey;
private AtomicInteger currentKeyId;
private AtomicInteger tokenSequenceNumber;
@SuppressWarnings("visibilitymodifier")
protected final Map<Integer, OzoneSecretKey> allKeys;
/**
* Create a secret manager.
@ -82,7 +78,6 @@ public OzoneSecretManager(SecurityConfig secureConf, long tokenMaxLifetime,
this.tokenRenewInterval = tokenRenewInterval;
currentKeyId = new AtomicInteger();
tokenSequenceNumber = new AtomicInteger();
allKeys = new ConcurrentHashMap<>();
this.service = service;
this.logger = logger;
}

View File

@ -17,31 +17,16 @@
package org.apache.hadoop.ozone.security;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.utils.db.Table.KeyValue;
import org.apache.hadoop.utils.db.TableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_MANAGER_TOKEN_DB_NAME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CACHE_SIZE_MB;
/**
* SecretStore for Ozone Master.
@ -50,13 +35,15 @@ public class OzoneSecretStore implements Closeable {
private static final Logger LOG = LoggerFactory
.getLogger(OzoneSecretStore.class);
private static final String TOKEN_MASTER_KEY_KEY_PREFIX = "tokens/key_";
private static final String TOKEN_STATE_KEY_PREFIX = "tokens/token_";
private OMMetadataManager omMetadataManager;
@Override
public void close() throws IOException {
if (store != null) {
store.close();
if (omMetadataManager != null) {
try {
omMetadataManager.getDelegationTokenTable().close();
} catch (Exception e) {
throw new IOException("Error while closing OzoneSecretStore.", e);
}
}
}
@ -65,185 +52,64 @@ public void close() throws IOException {
* Support class to maintain state of OzoneSecretStore.
*/
public static class OzoneManagerSecretState<T> {
private Map<T, Long> tokenState = new HashMap<>();
private Set<OzoneSecretKey> tokenMasterKeyState = new HashSet<>();
public Map<T, Long> getTokenState() {
return tokenState;
}
public Set<OzoneSecretKey> ozoneManagerSecretState() {
return tokenMasterKeyState;
}
}
private MetadataStore store;
public OzoneSecretStore(OzoneConfiguration conf)
throws IOException {
File metaDir = getOzoneMetaDirPath(conf);
final int cacheSize = conf.getInt(OZONE_OM_DB_CACHE_SIZE_MB,
OZONE_OM_DB_CACHE_SIZE_DEFAULT);
File omTokenDBFile = new File(metaDir.getPath(),
OZONE_MANAGER_TOKEN_DB_NAME);
this.store = MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(omTokenDBFile)
.setCacheSize(cacheSize * OzoneConsts.MB)
.build();
public OzoneSecretStore(OzoneConfiguration conf,
OMMetadataManager omMetadataManager) {
this.omMetadataManager = omMetadataManager;
}
public OzoneManagerSecretState loadState() throws IOException {
OzoneManagerSecretState state = new OzoneManagerSecretState();
int numKeys = loadMasterKeys(state);
LOG.info("Loaded " + numKeys + " token master keys");
OzoneManagerSecretState<Integer> state = new OzoneManagerSecretState();
int numTokens = loadTokens(state);
LOG.info("Loaded " + numTokens + " tokens");
return state;
}
public void storeTokenMasterKey(OzoneSecretKey key) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing master key " + key.getKeyId());
}
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
key.write(dataStream);
dataStream.close();
dataStream = null;
} finally {
IOUtils.cleanupWithLogger(LOG, dataStream);
}
try {
byte[] dbKey = getMasterKeyDBKey(key);
store.put(dbKey, memStream.toByteArray());
} catch (IOException e) {
LOG.error("Unable to store master key " + key.getKeyId(), e);
throw e;
}
}
public void removeTokenMasterKey(OzoneSecretKey key)
public void storeToken(OzoneTokenIdentifier tokenId, long renewDate)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing master key " + key.getKeyId());
LOG.debug("Storing token {}", tokenId.getSequenceNumber());
}
byte[] dbKey = getMasterKeyDBKey(key);
try {
store.delete(dbKey);
} catch (IOException e) {
LOG.error("Unable to delete master key " + key.getKeyId(), e);
throw e;
}
}
public void storeToken(OzoneTokenIdentifier tokenId, Long renewDate)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing token " + tokenId.getSequenceNumber());
}
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
tokenId.write(dataStream);
dataStream.writeLong(renewDate);
dataStream.close();
dataStream = null;
} finally {
IOUtils.cleanupWithLogger(LOG, dataStream);
}
byte[] dbKey = getTokenDBKey(tokenId);
try {
store.put(dbKey, memStream.toByteArray());
omMetadataManager.getDelegationTokenTable().put(tokenId, renewDate);
} catch (IOException e) {
LOG.error("Unable to store token " + tokenId.toString(), e);
throw e;
}
}
public void updateToken(OzoneTokenIdentifier tokenId, Long renewDate)
public void updateToken(OzoneTokenIdentifier tokenId, long renewDate)
throws IOException {
storeToken(tokenId, renewDate);
}
public void removeToken(OzoneTokenIdentifier tokenId)
throws IOException {
byte[] dbKey = getTokenDBKey(tokenId);
public void removeToken(OzoneTokenIdentifier tokenId) throws IOException {
try {
store.delete(dbKey);
omMetadataManager.getDelegationTokenTable().delete(tokenId);
} catch (IOException e) {
LOG.error("Unable to remove token " + tokenId.toString(), e);
LOG.error("Unable to remove token {}", tokenId.toString(), e);
throw e;
}
}
public int loadMasterKeys(OzoneManagerSecretState state) throws IOException {
MetadataKeyFilters.MetadataKeyFilter filter =
(preKey, currentKey, nextKey) -> DFSUtil.bytes2String(currentKey)
.startsWith(TOKEN_MASTER_KEY_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> kvs = store
.getRangeKVs(null, Integer.MAX_VALUE, filter);
kvs.forEach(entry -> {
try {
loadTokenMasterKey(state, entry.getValue());
} catch (IOException e) {
LOG.warn("Failed to load master key ",
DFSUtil.bytes2String(entry.getKey()), e);
}
});
return kvs.size();
}
private void loadTokenMasterKey(OzoneManagerSecretState state, byte[] data)
throws IOException {
OzoneSecretKey key = OzoneSecretKey.readProtoBuf(data);
state.tokenMasterKeyState.add(key);
}
public int loadTokens(OzoneManagerSecretState state) throws IOException {
MetadataKeyFilters.MetadataKeyFilter filter =
(preKey, currentKey, nextKey) -> DFSUtil.bytes2String(currentKey)
.startsWith(TOKEN_STATE_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> kvs =
store.getRangeKVs(null, Integer.MAX_VALUE, filter);
kvs.forEach(entry -> {
try {
loadToken(state, entry.getValue());
} catch (IOException e) {
LOG.warn("Failed to load token ",
DFSUtil.bytes2String(entry.getKey()), e);
int loadedToken = 0;
try (TableIterator<OzoneTokenIdentifier, ? extends
KeyValue<OzoneTokenIdentifier, Long>> iterator =
omMetadataManager.getDelegationTokenTable().iterator()){
iterator.seekToFirst();
while(iterator.hasNext()) {
KeyValue<OzoneTokenIdentifier, Long> kv = iterator.next();
state.tokenState.put(kv.getKey(), kv.getValue());
loadedToken++;
}
});
return kvs.size();
}
private void loadToken(OzoneManagerSecretState state, byte[] data)
throws IOException {
long renewDate;
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
OzoneTokenIdentifier tokenId = OzoneTokenIdentifier.readProtoBuf(in);
try {
tokenId.readFields(in);
renewDate = in.readLong();
} finally {
IOUtils.cleanupWithLogger(LOG, in);
}
state.tokenState.put(tokenId, renewDate);
}
private byte[] getMasterKeyDBKey(OzoneSecretKey masterKey) {
return DFSUtil.string2Bytes(
TOKEN_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId());
}
private byte[] getTokenDBKey(OzoneTokenIdentifier tokenId) {
return DFSUtil.string2Bytes(
TOKEN_STATE_KEY_PREFIX + tokenId.getSequenceNumber());
return loadedToken;
}
}

View File

@ -51,8 +51,8 @@ services:
ports:
- 9874:9874
environment:
ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
WAITFOR: scm:9876
ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
env_file:
- docker-config
command: ["/opt/hadoop/bin/ozone","om"]

View File

@ -149,5 +149,6 @@ Secure S3 test Failure
Secure S3 test Success
Run Keyword Setup credentials
${output} = Execute aws s3api --endpoint-url ${ENDPOINT_URL} create-bucket --bucket bucket-test123
Should contain ${result} Volume pqrs is not found
${output} = Execute aws s3api --endpoint-url ${ENDPOINT_URL} list-buckets
Should contain ${output} bucket-test123

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ipc.Client;
@ -95,6 +96,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
// Timeout for the cluster to be ready
private int waitForClusterToBeReadyTimeout = 60000; // 1 min
private CertificateClient caClient;
/**
* Creates a new MiniOzoneCluster.
@ -364,7 +366,18 @@ public void startScm() throws IOException {
*/
@Override
public void startHddsDatanodes() {
hddsDatanodes.forEach((datanode) -> datanode.start(null));
hddsDatanodes.forEach((datanode) -> {
datanode.setCertificateClient(getCAClient());
datanode.start(null);
});
}
private CertificateClient getCAClient() {
return this.caClient;
}
private void setCAClient(CertificateClient client) {
this.caClient = client;
}
@ -403,6 +416,7 @@ public MiniOzoneCluster build() throws IOException {
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
hddsDatanodes);
cluster.setCAClient(certClient);
if (startDataNodes) {
cluster.startHddsDatanodes();
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -123,6 +124,10 @@ public void testDatanodeIDPersistent() throws Exception {
id2.setPort(DatanodeDetails.newPort(Port.Name.STANDALONE, 2));
id3.setPort(DatanodeDetails.newPort(Port.Name.STANDALONE, 3));
// Add certificate serial id.
String certSerialId = "" + RandomUtils.nextLong();
id1.setCertSerialId(certSerialId);
// Write a single ID to the file and read it out
File validIdsFile = new File(WRITE_TMP, "valid-values.id");
validIdsFile.delete();
@ -130,6 +135,7 @@ public void testDatanodeIDPersistent() throws Exception {
DatanodeDetails validId = ContainerUtils.readDatanodeDetailsFrom(
validIdsFile);
assertEquals(validId.getCertSerialId(), certSerialId);
assertEquals(id1, validId);
assertEquals(id1.getProtoBufMessage(), validId.getProtoBufMessage());
@ -169,11 +175,11 @@ public void testContainerRandomPort() throws IOException {
true);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf);
TestUtils.randomDatanodeDetails(), ozoneConf, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf);
TestUtils.randomDatanodeDetails(), ozoneConf, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf)
TestUtils.randomDatanodeDetails(), ozoneConf, null)
) {
HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
@ -192,11 +198,11 @@ public void testContainerRandomPort() throws IOException {
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf);
TestUtils.randomDatanodeDetails(), ozoneConf, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf);
TestUtils.randomDatanodeDetails(), ozoneConf, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf)
TestUtils.randomDatanodeDetails(), ozoneConf, null)
) {
HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));

View File

@ -17,18 +17,6 @@
*/
package org.apache.hadoop.ozone;
import static junit.framework.TestCase.assertNotNull;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_EXPIRED;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.slf4j.event.Level.INFO;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
@ -100,6 +88,20 @@
import java.time.temporal.ChronoUnit;
import java.util.Date;
import static junit.framework.TestCase.assertNotNull;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_EXPIRED;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.slf4j.event.Level.INFO;
/**
* Test class to for security enabled Ozone cluster.
@ -138,6 +140,7 @@ public final class TestSecureOzoneCluster {
private Path metaDirPath;
@Rule
public TemporaryFolder folder= new TemporaryFolder();
private String omCertSerialId = "9879877970576";
@Before
public void init() {
@ -375,7 +378,6 @@ public void testSecureOMInitializationFailure() throws Exception {
initSCM();
// Create a secure SCM instance as om client will connect to it
scm = StorageContainerManager.createSCM(null, conf);
setupOm(conf);
conf.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
"non-existent-user@EXAMPLE.com");
@ -401,7 +403,7 @@ public void testSecureOmInitializationSuccess() throws Exception {
} catch (Exception ex) {
// Expects timeout failure from scmClient in om but om user login via
// kerberos should succeed.
Assert.assertTrue(logs.getOutput().contains("Ozone Manager login"
assertTrue(logs.getOutput().contains("Ozone Manager login"
+ " successful"));
}
}
@ -445,7 +447,7 @@ public void testDelegationToken() throws Exception {
CLIENT_TIMEOUT), RandomStringUtils.randomAscii(5));
// Assert if auth was successful via Kerberos
Assert.assertFalse(logs.getOutput().contains(
assertFalse(logs.getOutput().contains(
"Auth successful for " + username + " (auth:KERBEROS)"));
// Case 1: Test successful delegation token.
@ -454,7 +456,7 @@ public void testDelegationToken() throws Exception {
// Case 2: Test successful token renewal.
long renewalTime = omClient.renewDelegationToken(token);
Assert.assertTrue(renewalTime > 0);
assertTrue(renewalTime > 0);
// Check if token is of right kind and renewer is running om instance
Assert.assertEquals(token.getKind().toString(), "OzoneToken");
@ -483,11 +485,11 @@ public Void run() throws Exception {
});
// Case 3: Test Client can authenticate using token.
Assert.assertFalse(logs.getOutput().contains(
assertFalse(logs.getOutput().contains(
"Auth successful for " + username + " (auth:TOKEN)"));
OzoneTestUtils.expectOmException(VOLUME_NOT_FOUND,
() -> omClient.deleteVolume("vol1"));
Assert.assertTrue(logs.getOutput().contains("Auth successful for "
assertTrue(logs.getOutput().contains("Auth successful for "
+ username + " (auth:TOKEN)"));
// Case 4: Test failure of token renewal.
@ -500,11 +502,11 @@ public Void run() throws Exception {
try {
omClient.renewDelegationToken(token);
} catch (OMException ex) {
Assert.assertTrue(ex.getResult().equals(INVALID_AUTH_METHOD));
assertTrue(ex.getResult().equals(INVALID_AUTH_METHOD));
throw ex;
}
});
Assert.assertTrue(logs.getOutput().contains(
assertTrue(logs.getOutput().contains(
"Auth successful for " + username + " (auth:TOKEN)"));
omLogs.clearOutput();
//testUser.setAuthenticationMethod(AuthMethod.KERBEROS);
@ -522,7 +524,7 @@ public Void run() throws Exception {
// Wait for client to timeout
Thread.sleep(CLIENT_TIMEOUT);
Assert.assertFalse(logs.getOutput().contains("Auth failed for"));
assertFalse(logs.getOutput().contains("Auth failed for"));
// Case 6: Test failure of token cancellation.
// Get Om client, this time authentication using Token will fail as
@ -538,12 +540,12 @@ public Void run() throws Exception {
try {
omClient.cancelDelegationToken(token);
} catch (OMException ex) {
Assert.assertTrue(ex.getResult().equals(TOKEN_ERROR_OTHER));
assertTrue(ex.getResult().equals(TOKEN_ERROR_OTHER));
throw ex;
}
});
Assert.assertTrue(logs.getOutput().contains("Auth failed for"));
assertTrue(logs.getOutput().contains("Auth failed for"));
} finally {
om.stop();
om.join();
@ -600,7 +602,7 @@ public void testDelegationTokenRenewal() throws Exception {
// Renew delegation token
long expiryTime = omClient.renewDelegationToken(token);
Assert.assertTrue(expiryTime > 0);
assertTrue(expiryTime > 0);
omLogs.clearOutput();
// Test failure of delegation renewal
@ -612,7 +614,7 @@ public void testDelegationTokenRenewal() throws Exception {
try {
omClient.renewDelegationToken(token);
} catch (OMException ex) {
Assert.assertTrue(ex.getResult().equals(TOKEN_EXPIRED));
assertTrue(ex.getResult().equals(TOKEN_EXPIRED));
throw ex;
}
});
@ -625,7 +627,7 @@ public void testDelegationTokenRenewal() throws Exception {
LambdaTestUtils.intercept(OMException.class,
"Delegation token renewal failed",
() -> omClient.renewDelegationToken(token2));
Assert.assertTrue(omLogs.getOutput().contains(" with non-matching " +
assertTrue(omLogs.getOutput().contains(" with non-matching " +
"renewer randomService"));
omLogs.clearOutput();
@ -640,7 +642,7 @@ public void testDelegationTokenRenewal() throws Exception {
LambdaTestUtils.intercept(OMException.class,
"Delegation token renewal failed",
() -> omClient.renewDelegationToken(tamperedToken));
Assert.assertTrue(omLogs.getOutput().contains("can't be found in " +
assertTrue(omLogs.getOutput().contains("can't be found in " +
"cache"));
omLogs.clearOutput();
@ -654,6 +656,7 @@ private void setupOm(OzoneConfiguration config) throws Exception {
OMStorage omStore = new OMStorage(config);
omStore.setClusterId("testClusterId");
omStore.setScmId("testScmId");
omStore.setOmCertSerialId(omCertSerialId);
// writes the version file properties
omStore.initialize();
OzoneManager.setTestSecureOmFlag(true);
@ -690,11 +693,11 @@ public void testGetS3Secret() throws Exception {
.getS3Secret("HADOOP/JOHNDOE");
//secret fetched on both attempts must be same
Assert.assertTrue(firstAttempt.getAwsSecret()
assertTrue(firstAttempt.getAwsSecret()
.equals(secondAttempt.getAwsSecret()));
//access key fetched on both attempts must be same
Assert.assertTrue(firstAttempt.getAwsAccessKey()
assertTrue(firstAttempt.getAwsAccessKey()
.equals(secondAttempt.getAwsAccessKey()));
} finally {
@ -704,6 +707,52 @@ public void testGetS3Secret() throws Exception {
}
}
/**
* Tests functionality to init secure OM when it is already initialized.
*/
@Test
public void testSecureOmReInit() throws Exception {
LogCapturer omLogs =
LogCapturer.captureLogs(OzoneManager.getLogger());
omLogs.clearOutput();
initSCM();
try {
scm = StorageContainerManager.createSCM(null, conf);
scm.start();
conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, false);
OMStorage omStore = new OMStorage(conf);
initializeOmStorage(omStore);
OzoneManager.setTestSecureOmFlag(true);
om = OzoneManager.createOm(null, conf);
assertNull(om.getCertificateClient());
assertFalse(omLogs.getOutput().contains("Init response: GETCERT"));
assertFalse(omLogs.getOutput().contains("Successfully stored " +
"SCM signed certificate"));
conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
OzoneManager.omInit(conf);
om.stop();
om = OzoneManager.createOm(null, conf);
Assert.assertNotNull(om.getCertificateClient());
Assert.assertNotNull(om.getCertificateClient().getPublicKey());
Assert.assertNotNull(om.getCertificateClient().getPrivateKey());
Assert.assertNotNull(om.getCertificateClient().getCertificate());
assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
assertTrue(omLogs.getOutput().contains("Successfully stored " +
"SCM signed certificate"));
X509Certificate certificate = om.getCertificateClient().getCertificate();
validateCertificate(certificate);
} finally {
if (scm != null) {
scm.stop();
}
}
}
/**
* Test functionality to get SCM signed certificate for OM.
*/
@ -726,8 +775,8 @@ public void testSecureOmInitSuccess() throws Exception {
Assert.assertNotNull(om.getCertificateClient().getPublicKey());
Assert.assertNotNull(om.getCertificateClient().getPrivateKey());
Assert.assertNotNull(om.getCertificateClient().getCertificate());
Assert.assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
Assert.assertTrue(omLogs.getOutput().contains("Successfully stored " +
assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
assertTrue(omLogs.getOutput().contains("Successfully stored " +
"SCM signed certificate"));
X509Certificate certificate = om.getCertificateClient().getCertificate();
validateCertificate(certificate);
@ -761,17 +810,17 @@ public void validateCertificate(X509Certificate cert) throws Exception {
// Make sure the end date is honored.
invalidDate = java.sql.Date.valueOf(today.plus(1, ChronoUnit.DAYS));
Assert.assertTrue(cert.getNotAfter().after(invalidDate));
assertTrue(cert.getNotAfter().after(invalidDate));
invalidDate = java.sql.Date.valueOf(today.plus(400, ChronoUnit.DAYS));
Assert.assertTrue(cert.getNotAfter().before(invalidDate));
assertTrue(cert.getNotAfter().before(invalidDate));
Assert.assertTrue(cert.getSubjectDN().toString().contains(scmId));
Assert.assertTrue(cert.getSubjectDN().toString().contains(clusterId));
assertTrue(cert.getSubjectDN().toString().contains(scmId));
assertTrue(cert.getSubjectDN().toString().contains(clusterId));
Assert.assertTrue(cert.getIssuerDN().toString().contains(scmUser));
Assert.assertTrue(cert.getIssuerDN().toString().contains(scmId));
Assert.assertTrue(cert.getIssuerDN().toString().contains(clusterId));
assertTrue(cert.getIssuerDN().toString().contains(scmUser));
assertTrue(cert.getIssuerDN().toString().contains(scmId));
assertTrue(cert.getIssuerDN().toString().contains(clusterId));
// Verify that certificate matches the public key.
String encodedKey1 = cert.getPublicKey().toString();

View File

@ -27,7 +27,6 @@
import org.bouncycastle.cert.X509CertificateHolder;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyPair;
import java.security.PrivateKey;
@ -48,13 +47,28 @@ public class CertificateClientTestImpl implements CertificateClient {
private final SecurityConfig securityConfig;
private final KeyPair keyPair;
private final Configuration config;
private final X509Certificate x509Certificate;
public CertificateClientTestImpl(OzoneConfiguration conf) throws Exception{
public CertificateClientTestImpl(OzoneConfiguration conf) throws Exception {
securityConfig = new SecurityConfig(conf);
HDDSKeyGenerator keyGen =
new HDDSKeyGenerator(securityConfig.getConfiguration());
keyPair = keyGen.generateKey();
config = conf;
SelfSignedCertificate.Builder builder =
SelfSignedCertificate.newBuilder()
.setBeginDate(LocalDate.now())
.setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS))
.setClusterID("cluster1")
.setKey(keyPair)
.setSubject("TestCertSub")
.setConfiguration(config)
.setScmID("TestScmId1")
.makeCA();
X509CertificateHolder certificateHolder = null;
certificateHolder = builder.build();
x509Certificate = new JcaX509CertificateConverter().getCertificate(
certificateHolder);
}
@Override
@ -67,26 +81,21 @@ public PublicKey getPublicKey() {
return keyPair.getPublic();
}
/**
* Returns the certificate of the specified component if it exists on the
* local system.
*
* @return certificate or Null if there is no data.
*/
@Override
public X509Certificate getCertificate(String certSerialId)
throws CertificateException {
return x509Certificate;
}
@Override
public X509Certificate getCertificate() {
SelfSignedCertificate.Builder builder =
SelfSignedCertificate.newBuilder()
.setBeginDate(LocalDate.now())
.setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS))
.setClusterID("cluster1")
.setKey(keyPair)
.setSubject("TestCertSub")
.setConfiguration(config)
.setScmID("TestScmId1")
.makeCA();
X509CertificateHolder certificateHolder = null;
try {
certificateHolder = builder.build();
return new JcaX509CertificateConverter().getCertificate(
certificateHolder);
} catch (IOException | java.security.cert.CertificateException e) {
}
return null;
return x509Certificate;
}
@Override
@ -107,13 +116,13 @@ public byte[] signData(byte[] data) throws CertificateException {
@Override
public boolean verifySignature(InputStream stream, byte[] signature,
X509Certificate x509Certificate) throws CertificateException {
X509Certificate cert) throws CertificateException {
return true;
}
@Override
public boolean verifySignature(byte[] data, byte[] signature,
X509Certificate x509Certificate) throws CertificateException {
X509Certificate cert) throws CertificateException {
return true;
}
@ -128,7 +137,7 @@ public X509Certificate queryCertificate(String query) {
}
@Override
public void storeCertificate(X509Certificate certificate)
public void storeCertificate(String cert, boolean force)
throws CertificateException {
}

View File

@ -23,10 +23,13 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
@ -41,16 +44,21 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.UUID;
@ -71,6 +79,7 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
private static final String SCM_ID = UUID.randomUUID().toString();
private static File testDir;
private static OzoneConfiguration conf;
private static OzoneBlockTokenSecretManager secretManager;
/**
* Create a MiniOzoneCluster for testing.
@ -96,6 +105,14 @@ public static void init() throws Exception {
.setScmId(SCM_ID)
.setCertificateClient(certificateClientTest)
.build();
String user = UserGroupInformation.getCurrentUser().getShortUserName();
secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf),
60 *60, certificateClientTest.getCertificate().
getSerialNumber().toString());
secretManager.start(certificateClientTest);
Token<OzoneBlockTokenIdentifier> token = secretManager.generateToken(
user, EnumSet.allOf(AccessModeProto.class), 60*60);
UserGroupInformation.getCurrentUser().addToken(token);
cluster.getOzoneManager().startSecretManager();
cluster.waitForClusterToBeReady();
ozClient = OzoneClientFactory.getRpcClient(conf);
@ -163,6 +180,7 @@ public void testPutKeySuccessWithBlockToken() throws Exception {
* 2. writeChunk
* */
@Test
@Ignore("Needs to be moved out of this class as client setup is static")
public void testKeyOpFailureWithoutBlockToken() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
@ -176,7 +194,7 @@ public void testKeyOpFailureWithoutBlockToken() throws Exception {
for (int i = 0; i < 10; i++) {
String keyName = UUID.randomUUID().toString();
try(OzoneOutputStream out = bucket.createKey(keyName,
try (OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, new HashMap<>())) {
LambdaTestUtils.intercept(IOException.class, "UNAUTHENTICATED: Fail " +

View File

@ -376,6 +376,20 @@ public static ContainerCommandRequestProto getCreateContainerRequest(
return getContainerCommandRequestBuilder(containerID, pipeline).build();
}
/**
* Returns a create container command with token. There are a bunch of
* tests where we need to just send a request and get a reply.
*
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandRequestProto getCreateContainerRequest(
long containerID, Pipeline pipeline, Token token) throws IOException {
LOG.trace("addContainer: {}", containerID);
return getContainerCommandRequestBuilder(containerID, pipeline)
.setEncodedToken(token.encodeToUrlString())
.build();
}
private static Builder getContainerCommandRequestBuilder(long containerID,
Pipeline pipeline) throws IOException {
Builder request =

View File

@ -158,7 +158,7 @@ static XceiverServerRatis newXceiverServerRatis(
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
null);
null, null);
}
private static class TestContainerDispatcher implements ContainerDispatcher {

View File

@ -111,7 +111,7 @@ public void testContainerMetrics() throws Exception {
volumeSet, handlers, context, metrics);
dispatcher.setScmId(UUID.randomUUID().toString());
server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, null,
createReplicationService(new ContainerController(
containerSet, handlers)));
client = new XceiverClientGrpc(pipeline, conf);

View File

@ -81,7 +81,7 @@ public void testCreateOzoneContainer() throws Exception {
DatanodeStateMachine dsm = Mockito.mock(DatanodeStateMachine.class);
Mockito.when(dsm.getDatanodeDetails()).thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(dsm);
container = new OzoneContainer(datanodeDetails, conf, context);
container = new OzoneContainer(datanodeDetails, conf, context, null);
//Setting scmId, as we start manually ozone container.
container.getDispatcher().setScmId(UUID.randomUUID().toString());
container.start();

View File

@ -154,7 +154,7 @@ public void testCreateOzoneContainer() throws Exception {
conf.setBoolean(
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
container = new OzoneContainer(dn, conf, getContext(dn));
container = new OzoneContainer(dn, conf, getContext(dn), null);
//Setting scmId, as we start manually ozone container.
container.getDispatcher().setScmId(UUID.randomUUID().toString());
container.start();

View File

@ -18,15 +18,17 @@
package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
@ -34,7 +36,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
@ -52,7 +54,6 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
@ -61,6 +62,7 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -85,6 +87,8 @@ public class TestSecureOzoneContainer {
private Boolean requireBlockToken;
private Boolean hasBlockToken;
private Boolean blockTokeExpired;
private CertificateClientTestImpl caClient;
private OzoneBlockTokenSecretManager secretManager;
public TestSecureOzoneContainer(Boolean requireBlockToken,
@ -105,14 +109,16 @@ public static Collection<Object[]> blockTokenOptions() {
}
@Before
public void setup() throws IOException{
public void setup() throws Exception {
conf = new OzoneConfiguration();
String ozoneMetaPath =
GenericTestUtils.getTempPath("ozoneMeta");
conf.set(OZONE_METADATA_DIRS, ozoneMetaPath);
secConfig = new SecurityConfig(conf);
caClient = new CertificateClientTestImpl(conf);
secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf),
60 * 60 * 24, caClient.getCertificate().
getSerialNumber().toString());
}
@Test
@ -136,7 +142,7 @@ public void testCreateOzoneContainer() throws Exception {
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
DatanodeDetails dn = TestUtils.randomDatanodeDetails();
container = new OzoneContainer(dn, conf, getContext(dn));
container = new OzoneContainer(dn, conf, getContext(dn), caClient);
//Setting scmId, as we start manually ozone container.
container.getDispatcher().setScmId(UUID.randomUUID().toString());
container.start();
@ -148,54 +154,47 @@ public void testCreateOzoneContainer() throws Exception {
OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
"testUser", "cid:lud:bcsid",
EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
EnumSet.allOf(AccessModeProto.class),
expiryDate, "1234", 128L);
int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) {
port = secConfig.getConfiguration().getInt(OzoneConfigKeys
.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
.DFS_CONTAINER_IPC_PORT, DFS_CONTAINER_IPC_PORT_DEFAULT);
}
InetSocketAddress addr =
new InetSocketAddress(dn.getIpAddress(), port);
Token<OzoneBlockTokenIdentifier> token =
new Token(tokenId.getBytes(), new byte[50], tokenId.getKind(),
SecurityUtil.buildTokenService(addr));
secretManager.start(caClient);
Token<OzoneBlockTokenIdentifier> token = secretManager.generateToken(
"123", EnumSet.allOf(AccessModeProto.class), RandomUtils.nextLong());
if (hasBlockToken) {
ugi.addToken(token);
}
ugi.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
try {
XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
client.connect(token.encodeToUrlString());
if (hasBlockToken) {
createContainerForTesting(client, containerID, token);
} else {
createContainerForTesting(client, containerID, null);
}
} catch (Exception e) {
if (requireBlockToken && hasBlockToken && !blockTokeExpired) {
LOG.error("Unexpected error. ", e);
fail("Client with BlockToken should succeed when block token is" +
" required.");
}
if (requireBlockToken && hasBlockToken && blockTokeExpired) {
assertTrue("Receive expected exception",
e instanceof SCMSecurityException);
}
if (requireBlockToken && !hasBlockToken) {
assertTrue("Receive expected exception", e instanceof
IOException);
}
ugi.doAs((PrivilegedAction<Void>) () -> {
try {
XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
client.connect(token.encodeToUrlString());
if (hasBlockToken) {
createContainerForTesting(client, containerID, token);
} else {
createContainerForTesting(client, containerID, null);
}
} catch (Exception e) {
if (requireBlockToken && hasBlockToken && !blockTokeExpired) {
LOG.error("Unexpected error. ", e);
fail("Client with BlockToken should succeed when block token is" +
" required.");
}
if (requireBlockToken && hasBlockToken && blockTokeExpired) {
assertTrue("Receive expected exception",
e instanceof SCMSecurityException);
}
if (requireBlockToken && !hasBlockToken) {
assertTrue("Receive expected exception", e instanceof
IOException);
}
return null;
}
return null;
});
} finally {
if (container != null) {

View File

@ -21,6 +21,9 @@
import com.google.common.collect.Maps;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
@ -82,6 +85,7 @@ public class TestContainerServer {
static final String TEST_DIR = GenericTestUtils.getTestDir("dfs")
.getAbsolutePath() + File.separator;
private static final OzoneConfiguration CONF = new OzoneConfiguration();
private static CertificateClient caClient;
private GrpcReplicationService createReplicationService(
ContainerController containerController) {
@ -92,6 +96,7 @@ private GrpcReplicationService createReplicationService(
@BeforeClass
static public void setup() {
CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR);
caClient = new DNCertificateClient(new SecurityConfig(CONF));
}
@Test
@ -106,7 +111,7 @@ public void testClientServer() throws Exception {
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
XceiverClientGrpc::new,
(dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
new TestContainerDispatcher(),
new TestContainerDispatcher(), caClient,
createReplicationService(controller)), (dn, p) -> {
});
}
@ -137,7 +142,7 @@ static XceiverServerRatis newXceiverServerRatis(
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis
.newXceiverServerRatis(dn, conf, dispatcher, null);
.newXceiverServerRatis(dn, conf, dispatcher, null, caClient);
}
static void runTestClientServerRatis(RpcType rpc, int numNodes)
@ -229,7 +234,7 @@ public void testClientServerWithContainerDispatcher() throws Exception {
dispatcher.init();
server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
createReplicationService(
caClient, createReplicationService(
new ContainerController(containerSet, null)));
client = new XceiverClientGrpc(pipeline, conf);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.server;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -30,9 +31,12 @@
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@ -44,10 +48,14 @@
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.junit.Assert;
@ -58,13 +66,19 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getCreateContainerRequest;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getTestContainerID;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
import static org.junit.Assert.assertEquals;
/**
* Test Container servers when security is enabled.
@ -73,6 +87,7 @@ public class TestSecureContainerServer {
static final String TEST_DIR
= GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
private static final OzoneConfiguration CONF = new OzoneConfiguration();
private static CertificateClientTestImpl caClient;
private GrpcReplicationService createReplicationService(
ContainerController containerController) {
@ -81,10 +96,11 @@ private GrpcReplicationService createReplicationService(
}
@BeforeClass
static public void setup() {
static public void setup() throws Exception {
CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR);
CONF.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
CONF.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
caClient = new CertificateClientTestImpl(CONF);
}
@Test
@ -99,7 +115,7 @@ public void testClientServer() throws Exception {
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
XceiverClientGrpc::new,
(dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
new TestContainerDispatcher(),
new TestContainerDispatcher(), caClient,
createReplicationService(controller)), (dn, p) -> {
});
}
@ -131,7 +147,7 @@ static XceiverServerRatis newXceiverServerRatis(
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis
.newXceiverServerRatis(dn, conf, dispatcher, null);
.newXceiverServerRatis(dn, conf, dispatcher, null, caClient);
}
static void runTestClientServerRatis(RpcType rpc, int numNodes)
@ -173,28 +189,50 @@ static void runTestClientServer(
// Test 1: Test failure in request without block token.
final ContainerCommandRequestProto request =
ContainerTestHelper
.getCreateContainerRequest(
ContainerTestHelper.getTestContainerID(), pipeline);
getCreateContainerRequest(
getTestContainerID(), pipeline);
Assert.assertNotNull(request.getTraceID());
XceiverClientSpi finalClient = client;
LambdaTestUtils.intercept(IOException.class,
() -> ContainerProtocolCalls
.validateContainerResponse(finalClient.sendCommand(request)));
// Validation is different for grpc and ratis client.
if(client instanceof XceiverClientGrpc) {
LambdaTestUtils.intercept(SCMSecurityException.class, "Failed to" +
" authenticate with GRPC XceiverServer with Ozone block token",
() -> finalClient.sendCommand(request));
} else {
ContainerCommandResponseProto response = finalClient.
sendCommand(request);
assertEquals(BLOCK_TOKEN_VERIFICATION_FAILED, response.getResult());
}
// Test 2: Test success in request with valid block token.
long expiryTime = Time.monotonicNow() + 60 * 60 * 24;
String omCertSerialId =
caClient.getCertificate().getSerialNumber().toString();
OzoneBlockTokenSecretManager secretManager =
new OzoneBlockTokenSecretManager(new SecurityConfig(CONF),
expiryTime, omCertSerialId);
secretManager.start(caClient);
Token<OzoneBlockTokenIdentifier> token = secretManager.generateToken("1",
EnumSet.allOf(AccessModeProto.class), RandomUtils.nextLong());
final ContainerCommandRequestProto request2 =
ContainerTestHelper
.getCreateContainerSecureRequest(
ContainerTestHelper.getTestContainerID(), pipeline,
new Token<>());
getTestContainerID(), pipeline,
token);
Assert.assertNotNull(request2.getTraceID());
XceiverClientSpi finalClient2 = createClient.apply(pipeline, CONF);
if(finalClient2 instanceof XceiverClientGrpc) {
finalClient2.connect(token.encodeToUrlString());
} else {
finalClient2.connect();
}
XceiverClientSpi finalClient2 = client;
LambdaTestUtils.intercept(IOException.class, "",
() -> ContainerProtocolCalls
.validateContainerResponse(finalClient2.sendCommand(request)));
ContainerCommandRequestProto request3 = getCreateContainerRequest(
getTestContainerID(), pipeline, token);
ContainerCommandResponseProto resp = finalClient2.sendCommand(request3);
assertEquals(SUCCESS, resp.getResult());
} finally {
if (client != null) {
client.close();

View File

@ -46,6 +46,7 @@
import java.util.UUID;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
@ -83,6 +84,7 @@ public void init() throws Exception {
conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
conf.set(OZONE_SCM_NAMES, "localhost");
final String path = getTempPath(UUID.randomUUID().toString());
metaDir = Paths.get(path, "om-meta");
@ -175,7 +177,6 @@ public void testSecureOmInitFailures() throws Exception {
omLogs.clearOutput();
// Case 5: When only certificate is present.
client = new OMCertificateClient(securityConfig);
FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
.toString(), securityConfig.getPublicKeyFileName()).toFile());
CertificateCodec certCodec = new CertificateCodec(securityConfig);
@ -184,6 +185,9 @@ public void testSecureOmInitFailures() throws Exception {
securityConfig.getSignatureAlgo());
certCodec.writeCertificate(new X509CertificateHolder(
x509Certificate.getEncoded()));
client = new OMCertificateClient(securityConfig,
x509Certificate.getSerialNumber().toString());
omStorage.setOmCertSerialId(x509Certificate.getSerialNumber().toString());
LambdaTestUtils.intercept(RuntimeException.class, " OM security" +
" initialization failed",
() -> OzoneManager.initializeSecurity(conf, omStorage));
@ -194,7 +198,8 @@ public void testSecureOmInitFailures() throws Exception {
omLogs.clearOutput();
// Case 6: When private key and certificate is present.
client = new OMCertificateClient(securityConfig);
client = new OMCertificateClient(securityConfig,
x509Certificate.getSerialNumber().toString());
FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
.toString(), securityConfig.getPublicKeyFileName()).toFile());
keyCodec.writePrivateKey(privateKey);
@ -206,7 +211,8 @@ public void testSecureOmInitFailures() throws Exception {
omLogs.clearOutput();
// Case 7 When keypair and certificate is present.
client = new OMCertificateClient(securityConfig);
client = new OMCertificateClient(securityConfig,
x509Certificate.getSerialNumber().toString());
OzoneManager.initializeSecurity(conf, omStorage);
Assert.assertNotNull(client.getPrivateKey());
Assert.assertNotNull(client.getPublicKey());

View File

@ -36,6 +36,7 @@ public class OMStorage extends Storage {
public static final String STORAGE_DIR = "om";
public static final String OM_ID = "omUuid";
public static final String OM_CERT_SERIAL_ID = "omCertSerialId";
/**
* Construct OMStorage.
@ -53,6 +54,10 @@ public void setScmId(String scmId) throws IOException {
}
}
public void setOmCertSerialId(String certSerialId) throws IOException {
getStorageInfo().setProperty(OM_CERT_SERIAL_ID, certSerialId);
}
public void setOmId(String omId) throws IOException {
if (getState() == StorageState.INITIALIZED) {
throw new IOException("OM is already initialized.");
@ -77,6 +82,14 @@ public String getOmId() {
return getStorageInfo().getProperty(OM_ID);
}
/**
* Retrieves the serial id of certificate issued by SCM.
* @return OM_ID
*/
public String getOmCertSerialId() {
return getStorageInfo().getProperty(OM_CERT_SERIAL_ID);
}
@Override
protected Properties getNodeProperties() {
String omId = getOmId();
@ -85,6 +98,10 @@ protected Properties getNodeProperties() {
}
Properties omProperties = new Properties();
omProperties.setProperty(OM_ID, omId);
if (getOmCertSerialId() != null) {
omProperties.setProperty(OM_CERT_SERIAL_ID, getOmCertSerialId());
}
return omProperties;
}
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec;
import org.apache.hadoop.ozone.om.codec.OmMultipartKeyInfoCodec;
import org.apache.hadoop.ozone.om.codec.OmVolumeArgsCodec;
import org.apache.hadoop.ozone.om.codec.TokenIdentifierCodec;
import org.apache.hadoop.ozone.om.codec.VolumeListCodec;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
@ -41,6 +42,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.hadoop.utils.db.DBStoreBuilder;
import org.apache.hadoop.utils.db.Table;
@ -91,6 +93,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
* |-------------------------------------------------------------------|
* | s3SecretTable | s3g_access_key_id -> s3Secret |
* |-------------------------------------------------------------------|
* | dTokenTable | s3g_access_key_id -> s3Secret |
* |-------------------------------------------------------------------|
*/
private static final String USER_TABLE = "userTable";
@ -102,6 +106,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
private static final String S3_TABLE = "s3Table";
private static final String MULTIPARTINFO_TABLE = "multipartInfoTable";
private static final String S3_SECRET_TABLE = "s3SecretTable";
private static final String DELEGATION_TOKEN_TABLE = "dTokenTable";
private DBStore store;
@ -117,6 +122,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
private Table s3Table;
private Table<String, OmMultipartKeyInfo> multipartInfoTable;
private Table s3SecretTable;
private Table dTokenTable;
public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
this.lock = new OzoneManagerLock(conf);
@ -131,6 +137,10 @@ public Table<String, VolumeList> getUserTable() {
return userTable;
}
public Table<OzoneTokenIdentifier, Long> getDelegationTokenTable() {
return dTokenTable;
}
@Override
public Table<String, OmVolumeArgs> getVolumeTable() {
return volumeTable;
@ -200,6 +210,8 @@ public void start(OzoneConfiguration configuration) throws IOException {
.addTable(S3_TABLE)
.addTable(MULTIPARTINFO_TABLE)
.addTable(S3_SECRET_TABLE)
.addTable(DELEGATION_TOKEN_TABLE)
.addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
.addCodec(OmKeyInfo.class, new OmKeyInfoCodec())
.addCodec(OmBucketInfo.class, new OmBucketInfoCodec())
.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec())
@ -234,6 +246,10 @@ public void start(OzoneConfiguration configuration) throws IOException {
s3Table = this.store.getTable(S3_TABLE);
checkTableStatus(s3Table, S3_TABLE);
dTokenTable = this.store.getTable(DELEGATION_TOKEN_TABLE,
OzoneTokenIdentifier.class, Long.class);
checkTableStatus(dTokenTable, DELEGATION_TOKEN_TABLE);
multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE,
String.class, OmMultipartKeyInfo.class);
checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE);

View File

@ -26,9 +26,8 @@
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.security.KeyPair;
import java.security.cert.CertificateException;
import java.util.Collection;
import java.util.Objects;
@ -221,7 +220,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private static boolean securityEnabled = false;
private OzoneDelegationTokenSecretManager delegationTokenMgr;
private OzoneBlockTokenSecretManager blockTokenMgr;
private KeyPair keyPair;
private CertificateClient certClient;
private static boolean testSecureOmFlag = false;
private final Text omRpcAddressTxt;
@ -325,13 +323,19 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
volumeManager, bucketManager);
if (secConfig.isSecurityEnabled()) {
omComponent = OM_DAEMON + "-" + omId;
certClient = new OMCertificateClient(new SecurityConfig(conf));
if(omStorage.getOmCertSerialId() == null) {
throw new RuntimeException("OzoneManager started in secure mode but " +
"doesn't have SCM signed certificate.");
}
certClient = new OMCertificateClient(new SecurityConfig(conf),
omStorage.getOmCertSerialId());
s3SecretManager = new S3SecretManagerImpl(configuration, metadataManager);
delegationTokenMgr = createDelegationTokenSecretManager(configuration);
}
if (secConfig.isBlockTokenEnabled()) {
blockTokenMgr = createBlockTokenSecretManager(configuration);
}
omRpcServer = getRpcServer(conf);
omRpcAddress = updateRPCListenAddress(configuration,
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
@ -693,8 +697,6 @@ private void readKeyPair() throws OzoneSecurityException {
Objects.requireNonNull(pubKey);
Objects.requireNonNull(pvtKey);
Objects.requireNonNull(certClient.getCertificate());
keyPair = new KeyPair(pubKey, pvtKey);
} catch (Exception e) {
throw new OzoneSecurityException("Error reading keypair & certificate "
+ "OzoneManager.", e, OzoneSecurityException
@ -950,7 +952,7 @@ private static OzoneManager createOm(String[] argv,
* accessible
*/
@VisibleForTesting
static boolean omInit(OzoneConfiguration conf) throws IOException {
public static boolean omInit(OzoneConfiguration conf) throws IOException {
OMStorage omStorage = new OMStorage(conf);
StorageState state = omStorage.getState();
if (state != StorageState.INITIALIZED) {
@ -966,22 +968,27 @@ static boolean omInit(OzoneConfiguration conf) throws IOException {
}
omStorage.setClusterId(clusterId);
omStorage.setScmId(scmId);
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
initializeSecurity(conf, omStorage);
}
omStorage.initialize();
System.out.println(
"OM initialization succeeded.Current cluster id for sd="
+ omStorage.getStorageDir() + ";cid=" + omStorage
.getClusterID());
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
initializeSecurity(conf, omStorage);
}
return true;
} catch (IOException ioe) {
LOG.error("Could not initialize OM version file", ioe);
return false;
}
} else {
if(OzoneSecurityUtil.isSecurityEnabled(conf) &&
omStorage.getOmCertSerialId() == null) {
LOG.info("OM storage is already initialized. Initializing security");
initializeSecurity(conf, omStorage);
omStorage.persistCurrentState();
}
System.out.println(
"OM already initialized.Reusing existing cluster id for sd="
+ omStorage.getStorageDir() + ";cid=" + omStorage
@ -1000,7 +1007,8 @@ public static void initializeSecurity(OzoneConfiguration conf,
LOG.info("Initializing secure OzoneManager.");
CertificateClient certClient =
new OMCertificateClient(new SecurityConfig(conf));
new OMCertificateClient(new SecurityConfig(conf),
omStore.getOmCertSerialId());
CertificateClient.InitResponse response = certClient.init();
LOG.info("Init response: {}", response);
switch (response) {
@ -1313,7 +1321,9 @@ public void stop() {
isOmRpcServerRunning = false;
keyManager.stop();
stopSecretManager();
httpServer.stop();
if (httpServer != null) {
httpServer.stop();
}
metadataManager.stop();
metrics.unRegister();
unregisterMXBean();
@ -1397,9 +1407,10 @@ private static void getSCMSignedCert(CertificateClient client,
getEncodedString(csr));
try {
X509Certificate x509Certificate =
CertificateCodec.getX509Cert(pemEncodedCert);
client.storeCertificate(x509Certificate);
client.storeCertificate(pemEncodedCert, true);
// Persist om cert serial id.
omStore.setOmCertSerialId(CertificateCodec.
getX509Certificate(pemEncodedCert).getSerialNumber().toString());
} catch (IOException | CertificateException e) {
LOG.error("Error while storing SCM signed certificate.", e);
throw new RuntimeException(e);

View File

@ -18,26 +18,29 @@
package org.apache.hadoop.ozone.security;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.S3SecretManager;
import org.apache.hadoop.ozone.om.S3SecretManagerImpl;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
@ -63,18 +66,18 @@ public class TestOzoneDelegationTokenSecretManager {
private long expiryTime;
private Text serviceRpcAdd;
private OzoneConfiguration conf;
private static final String BASEDIR = GenericTestUtils.getTempPath(
TestOzoneDelegationTokenSecretManager.class.getSimpleName());
private final static Text TEST_USER = new Text("testUser");
private long tokenMaxLifetime = 1000 * 20;
private long tokenRemoverScanInterval = 1000 * 20;
private S3SecretManager s3SecretManager;
private String s3Secret = "dbaksbzljandlkandlsd";
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, BASEDIR);
conf = createNewTestPath();
securityConfig = new SecurityConfig(conf);
certificateClient = setupCertificateClient();
certificateClient.init();
@ -83,7 +86,8 @@ public void setUp() throws Exception {
final Map<String, String> s3Secrets = new HashMap<>();
s3Secrets.put("testuser1", s3Secret);
s3Secrets.put("abc", "djakjahkd");
s3SecretManager = new S3SecretManager() {
OMMetadataManager metadataManager = new OmMetadataManagerImpl(conf);
s3SecretManager = new S3SecretManagerImpl(conf, metadataManager) {
@Override
public S3SecretValue getS3Secret(String kerberosID) {
if(s3Secrets.containsKey(kerberosID)) {
@ -102,6 +106,16 @@ public String getS3UserSecretString(String awsAccessKey) {
};
}
private OzoneConfiguration createNewTestPath() throws IOException {
OzoneConfiguration config = new OzoneConfiguration();
File newFolder = folder.newFolder();
if (!newFolder.exists()) {
Assert.assertTrue(newFolder.mkdirs());
}
ServerUtils.setOzoneMetaDirPath(config, newFolder.toString());
return config;
}
/**
* Helper function to create certificate client.
* */
@ -125,13 +139,17 @@ public PrivateKey getPrivateKey() {
public PublicKey getPublicKey() {
return keyPair.getPublic();
}
@Override
public X509Certificate getCertificate(String serialId) {
return cert;
}
};
}
@After
public void tearDown() throws IOException {
secretManager.stop();
FileUtils.deleteQuietly(new File(BASEDIR));
}
@Test
@ -140,8 +158,7 @@ public void testCreateToken() throws Exception {
expiryTime, tokenRemoverScanInterval);
secretManager.start(certificateClient);
Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
TEST_USER,
TEST_USER);
TEST_USER, TEST_USER);
OzoneTokenIdentifier identifier =
OzoneTokenIdentifier.readProtoBuf(token.getIdentifier());
// Check basic details.
@ -276,8 +293,7 @@ public void testVerifySignatureFailure() throws Exception {
id.setOmCertSerialId("1927393");
id.setMaxDate(Time.now() + 60*60*24);
id.setOwner(new Text("test"));
Assert.assertFalse(secretManager.verifySignature(id,
certificateClient.signData(id.getBytes())));
Assert.assertFalse(secretManager.verifySignature(id, id.getBytes()));
}
@Test