HDDS-2020. Remove mTLS from Ozone GRPC. Contributed by Xiaoyu Yao.

Signed-off-by: Anu Engineer <aengineer@apache.org>
This commit is contained in:
Xiaoyu Yao 2019-08-28 08:56:33 -07:00 committed by Anu Engineer
parent 9be448b336
commit d072d3304c
48 changed files with 405 additions and 411 deletions

View File

@ -52,8 +52,8 @@ import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -80,6 +80,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private boolean closed = false; private boolean closed = false;
private SecurityConfig secConfig; private SecurityConfig secConfig;
private final boolean topologyAwareRead; private final boolean topologyAwareRead;
private X509Certificate caCert;
/** /**
* Constructs a client that can communicate with the Container framework on * Constructs a client that can communicate with the Container framework on
@ -87,8 +88,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* *
* @param pipeline - Pipeline that defines the machines. * @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config * @param config -- Ozone Config
* @param caCert - SCM ca certificate.
*/ */
public XceiverClientGrpc(Pipeline pipeline, Configuration config) { public XceiverClientGrpc(Pipeline pipeline, Configuration config,
X509Certificate caCert) {
super(); super();
Preconditions.checkNotNull(pipeline); Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config); Preconditions.checkNotNull(config);
@ -103,6 +106,18 @@ public class XceiverClientGrpc extends XceiverClientSpi {
this.topologyAwareRead = config.getBoolean( this.topologyAwareRead = config.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
this.caCert = caCert;
}
/**
* Constructs a client that can communicate with the Container framework on
* data nodes.
*
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
*/
public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
this(pipeline, config, null);
} }
/** /**
@ -151,19 +166,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
.intercept(new ClientCredentialInterceptor(userName, encodedToken), .intercept(new ClientCredentialInterceptor(userName, encodedToken),
new GrpcClientInterceptor()); new GrpcClientInterceptor());
if (secConfig.isGrpcTlsEnabled()) { if (secConfig.isGrpcTlsEnabled()) {
File trustCertCollectionFile = secConfig.getTrustStoreFile(COMPONENT);
File privateKeyFile = secConfig.getClientPrivateKeyFile(COMPONENT);
File clientCertChainFile = secConfig.getClientCertChainFile(COMPONENT);
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient(); SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
if (trustCertCollectionFile != null) { if (caCert != null) {
sslContextBuilder.trustManager(trustCertCollectionFile); sslContextBuilder.trustManager(caCert);
} }
if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null
&& privateKeyFile != null) {
sslContextBuilder.keyManager(clientCertChainFile, privateKeyFile);
}
if (secConfig.useTestCert()) { if (secConfig.useTestCert()) {
channelBuilder.overrideAuthority("localhost"); channelBuilder.overrideAuthority("localhost");
} }

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -39,6 +41,8 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -65,6 +69,7 @@ public class XceiverClientManager implements Closeable {
private final Configuration conf; private final Configuration conf;
private final Cache<String, XceiverClientSpi> clientCache; private final Cache<String, XceiverClientSpi> clientCache;
private final boolean useRatis; private final boolean useRatis;
private X509Certificate caCert;
private static XceiverClientMetrics metrics; private static XceiverClientMetrics metrics;
private boolean isSecurityEnabled; private boolean isSecurityEnabled;
@ -74,11 +79,13 @@ public class XceiverClientManager implements Closeable {
* *
* @param conf configuration * @param conf configuration
*/ */
public XceiverClientManager(Configuration conf) { public XceiverClientManager(Configuration conf) throws IOException {
this(conf, OzoneConfiguration.of(conf).getObject(ScmClientConfig.class)); this(conf, OzoneConfiguration.of(conf).getObject(ScmClientConfig.class),
null);
} }
public XceiverClientManager(Configuration conf, ScmClientConfig clientConf) { public XceiverClientManager(Configuration conf, ScmClientConfig clientConf,
String caCertPem) throws IOException {
Preconditions.checkNotNull(clientConf); Preconditions.checkNotNull(clientConf);
Preconditions.checkNotNull(conf); Preconditions.checkNotNull(conf);
long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS); long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
@ -87,6 +94,16 @@ public class XceiverClientManager implements Closeable {
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
this.conf = conf; this.conf = conf;
this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf); this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
if (isSecurityEnabled) {
Preconditions.checkNotNull(caCertPem);
try {
this.caCert = CertificateCodec.getX509Cert(caCertPem);
} catch (CertificateException ex) {
throw new SCMSecurityException("Error: Fail to get SCM CA certificate",
ex);
}
}
this.clientCache = CacheBuilder.newBuilder() this.clientCache = CacheBuilder.newBuilder()
.expireAfterAccess(staleThresholdMs, MILLISECONDS) .expireAfterAccess(staleThresholdMs, MILLISECONDS)
.maximumSize(clientConf.getMaxSize()) .maximumSize(clientConf.getMaxSize())
@ -211,11 +228,12 @@ public class XceiverClientManager implements Closeable {
XceiverClientSpi client = null; XceiverClientSpi client = null;
switch (type) { switch (type) {
case RATIS: case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf); client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf,
caCert);
client.connect(); client.connect();
break; break;
case STAND_ALONE: case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf); client = new XceiverClientGrpc(pipeline, conf, caCert);
break; break;
case CHAINED: case CHAINED:
default: default:

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm; package org.apache.hadoop.hdds.scm;
import java.io.IOException; import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -78,6 +79,12 @@ public final class XceiverClientRatis extends XceiverClientSpi {
public static XceiverClientRatis newXceiverClientRatis( public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
Configuration ozoneConf) { Configuration ozoneConf) {
return newXceiverClientRatis(pipeline, ozoneConf, null);
}
public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
Configuration ozoneConf, X509Certificate caCert) {
final String rpcType = ozoneConf final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
@ -87,7 +94,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
HddsClientUtils.getMaxOutstandingRequests(ozoneConf); HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf)); SecurityConfig(ozoneConf), caCert);
return new XceiverClientRatis(pipeline, return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests, SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
retryPolicy, tlsConfig, clientRequestTimeout); retryPolicy, tlsConfig, clientRequestTimeout);

View File

@ -176,34 +176,18 @@ public final class HddsConfigKeys {
private HddsConfigKeys() { private HddsConfigKeys() {
} }
// Enable TLS for GRPC clients/server in ozone.
public static final String HDDS_GRPC_TLS_ENABLED = "hdds.grpc.tls.enabled"; public static final String HDDS_GRPC_TLS_ENABLED = "hdds.grpc.tls.enabled";
public static final boolean HDDS_GRPC_TLS_ENABLED_DEFAULT = false; public static final boolean HDDS_GRPC_TLS_ENABLED_DEFAULT = false;
public static final String HDDS_GRPC_MUTUAL_TLS_REQUIRED = // Choose TLS provider the default is set to OPENSSL for better performance.
"hdds.grpc.mutual.tls.required";
public static final boolean HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT = false;
public static final String HDDS_GRPC_TLS_PROVIDER = "hdds.grpc.tls.provider"; public static final String HDDS_GRPC_TLS_PROVIDER = "hdds.grpc.tls.provider";
public static final String HDDS_GRPC_TLS_PROVIDER_DEFAULT = "OPENSSL"; public static final String HDDS_GRPC_TLS_PROVIDER_DEFAULT = "OPENSSL";
public static final String HDDS_TRUST_STORE_FILE_NAME = // Test only settings for using test signed certificate, authority assume to
"hdds.trust.cert.collection.file.name"; // be localhost.
public static final String HDDS_TRUST_STORE_FILE_NAME_DEFAULT = "ca.crt";
public static final String
HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME =
"hdds.server.cert.chain.file.name";
public static final String
HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT = "server.crt";
public static final String
HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME =
"hdds.client.cert.chain.file.name";
public static final String
HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT = "client.crt";
public static final String HDDS_GRPC_TLS_TEST_CERT = "hdds.grpc.tls" + public static final String HDDS_GRPC_TLS_TEST_CERT = "hdds.grpc.tls" +
".test_cert"; ".test.cert";
public static final boolean HDDS_GRPC_TLS_TEST_CERT_DEFAULT = false; public static final boolean HDDS_GRPC_TLS_TEST_CERT_DEFAULT = false;
// Comma separated acls (users, groups) allowing clients accessing // Comma separated acls (users, groups) allowing clients accessing

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hdds.ratis; package org.apache.hadoop.hdds.ratis;
import java.io.IOException; import java.io.IOException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -31,7 +33,11 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
@ -200,29 +206,47 @@ public interface RatisHelper {
return builder.build(); return builder.build();
} }
static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf) { // For External gRPC client to server with gRPC TLS.
if (conf.isGrpcTlsEnabled()) { // No mTLS for external client as SCM CA does not issued certificates for them
if (conf.isGrpcMutualTlsRequired()) { static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf,
return new GrpcTlsConfig(conf.getClientPrivateKeyFile(), X509Certificate caCert) {
conf.getClientCertChainFile(), conf.getTrustStoreFile(), true); GrpcTlsConfig tlsConfig = null;
} else { if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
return new GrpcTlsConfig( tlsConfig = new GrpcTlsConfig(null, null,
null, null, conf.getTrustStoreFile(), false); caCert, false);
}
return tlsConfig;
}
// For Internal gRPC client from SCM to DN with gRPC TLS
static GrpcTlsConfig createTlsClientConfigForSCM(SecurityConfig conf,
CertificateServer certificateServer) throws IOException {
if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
try {
X509Certificate caCert =
CertificateCodec.getX509Certificate(
certificateServer.getCACertificate());
return new GrpcTlsConfig(null, null,
caCert, false);
} catch (CertificateException ex) {
throw new SCMSecurityException("Fail to find SCM CA certificate.", ex);
} }
} }
return null; return null;
} }
static GrpcTlsConfig createTlsServerConfig(SecurityConfig conf) { // For gRPC server running DN container service with gPRC TLS
if (conf.isGrpcTlsEnabled()) { // No mTLS as the channel is shared for for external client, which
if (conf.isGrpcMutualTlsRequired()) { // does not have SCM CA issued certificates.
// In summary:
// authenticate from server to client is via TLS.
// authenticate from client to server is via block token (or container token).
static GrpcTlsConfig createTlsServerConfigForDN(SecurityConfig conf,
CertificateClient caClient) {
if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
return new GrpcTlsConfig( return new GrpcTlsConfig(
conf.getServerPrivateKeyFile(), conf.getServerCertChainFile(), null, caClient.getPrivateKey(), caClient.getCertificate(),
false); null, false);
} else {
return new GrpcTlsConfig(conf.getServerPrivateKeyFile(),
conf.getServerCertChainFile(), conf.getClientCertChainFile(), true);
}
} }
return null; return null;
} }

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hdds.security.x509; package org.apache.hadoop.hdds.security.x509;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider;
@ -28,7 +27,6 @@ import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.security.Provider; import java.security.Provider;
@ -47,14 +45,6 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER_DEFAULT; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT_DEFAULT; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_MUTUAL_TLS_REQUIRED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_TRUST_STORE_FILE_NAME;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_TRUST_STORE_FILE_NAME_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_ALGORITHM; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_ALGORITHM;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME;
@ -106,12 +96,8 @@ public class SecurityConfig {
private final String certificateFileName; private final String certificateFileName;
private final boolean grpcTlsEnabled; private final boolean grpcTlsEnabled;
private boolean grpcTlsUseTestCert; private boolean grpcTlsUseTestCert;
private String trustStoreFileName;
private String serverCertChainFileName;
private String clientCertChainFileName;
private final Duration defaultCertDuration; private final Duration defaultCertDuration;
private final boolean isSecurityEnabled; private final boolean isSecurityEnabled;
private boolean grpcMutualTlsRequired;
/** /**
* Constructs a SecurityConfig. * Constructs a SecurityConfig.
@ -158,20 +144,6 @@ public class SecurityConfig {
HDDS_GRPC_TLS_ENABLED_DEFAULT); HDDS_GRPC_TLS_ENABLED_DEFAULT);
if (grpcTlsEnabled) { if (grpcTlsEnabled) {
this.grpcMutualTlsRequired = configuration.getBoolean(
HDDS_GRPC_MUTUAL_TLS_REQUIRED, HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT);
this.trustStoreFileName = this.configuration.get(
HDDS_TRUST_STORE_FILE_NAME, HDDS_TRUST_STORE_FILE_NAME_DEFAULT);
this.clientCertChainFileName = this.configuration.get(
HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME,
HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT);
this.serverCertChainFileName = this.configuration.get(
HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME,
HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT);
this.grpcTlsUseTestCert = this.configuration.getBoolean( this.grpcTlsUseTestCert = this.configuration.getBoolean(
HDDS_GRPC_TLS_TEST_CERT, HDDS_GRPC_TLS_TEST_CERT_DEFAULT); HDDS_GRPC_TLS_TEST_CERT, HDDS_GRPC_TLS_TEST_CERT_DEFAULT);
} }
@ -351,115 +323,6 @@ public class SecurityConfig {
return this.grpcTlsEnabled; return this.grpcTlsEnabled;
} }
/**
* Returns true if TLS mutual authentication is enabled for gRPC services.
* @return true if TLS is enabled for gRPC services.
*/
public boolean isGrpcMutualTlsRequired() {
return this.grpcMutualTlsRequired;
}
/**
* Returns the TLS-enabled gRPC client private key file(Only needed for mutual
* authentication) for the given component.
* @param component name of the component.
* @return the TLS-enabled gRPC client private key file.
*/
public File getClientPrivateKeyFile(String component) {
return Paths.get(getKeyLocation(component).toString(),
"client." + privateKeyFileName).toFile();
}
/**
* Returns the TLS-enabled gRPC client private key file(Only needed for mutual
* authentication).
* @return the TLS-enabled gRPC client private key file.
*/
public File getClientPrivateKeyFile() {
return getClientPrivateKeyFile(StringUtils.EMPTY);
}
/**
* Returns the TLS-enabled gRPC server private key file for the given
* component.
* @param component name of the component.
* @return the TLS-enabled gRPC server private key file.
*/
public File getServerPrivateKeyFile(String component) {
return Paths.get(getKeyLocation(component).toString(),
"server." + privateKeyFileName).toFile();
}
/**
* Returns the TLS-enabled gRPC server private key file.
* @return the TLS-enabled gRPC server private key file.
*/
public File getServerPrivateKeyFile() {
return getServerPrivateKeyFile(StringUtils.EMPTY);
}
/**
* Get the trusted CA certificate file for the given component. (CA
* certificate)
* @param component name of the component.
* @return the trusted CA certificate.
*/
public File getTrustStoreFile(String component) {
return Paths.get(getKeyLocation(component).toString(),
trustStoreFileName).
toFile();
}
/**
* Get the trusted CA certificate file. (CA certificate)
* @return the trusted CA certificate.
*/
public File getTrustStoreFile() {
return getTrustStoreFile(StringUtils.EMPTY);
}
/**
* Get the TLS-enabled gRPC Client certificate chain file for the given
* component (only needed for
* mutual authentication).
* @param component name of the component.
* @return the TLS-enabled gRPC Server certificate chain file.
*/
public File getClientCertChainFile(String component) {
return Paths.get(getKeyLocation(component).toString(),
clientCertChainFileName).
toFile();
}
/**
* Get the TLS-enabled gRPC Client certificate chain file (only needed for
* mutual authentication).
* @return the TLS-enabled gRPC Server certificate chain file.
*/
public File getClientCertChainFile() {
return getClientCertChainFile(StringUtils.EMPTY);
}
/**
* Get the TLS-enabled gRPC Server certificate chain file for the given
* component.
* @param component name of the component.
* @return the TLS-enabled gRPC Server certificate chain file.
*/
public File getServerCertChainFile(String component) {
return Paths.get(getKeyLocation(component).toString(),
serverCertChainFileName).
toFile();
}
/**
* Get the TLS-enabled gRPC Server certificate chain file.
* @return the TLS-enabled gRPC Server certificate chain file.
*/
public File getServerCertChainFile() {
return getServerCertChainFile(StringUtils.EMPTY);
}
/** /**
* Get the gRPC TLS provider. * Get the gRPC TLS provider.
* @return the gRPC TLS Provider. * @return the gRPC TLS Provider.

View File

@ -69,6 +69,12 @@ public interface CertificateClient {
*/ */
X509Certificate getCertificate(); X509Certificate getCertificate();
/**
* Return the latest CA certificate known to the client.
* @return latest ca certificate known to the client.
*/
X509Certificate getCACertificate();
/** /**
* Verifies if this certificate is part of a trusted chain. * Verifies if this certificate is part of a trusted chain.
* @param certificate - certificate. * @param certificate - certificate.

View File

@ -20,7 +20,9 @@
package org.apache.hadoop.hdds.security.x509.certificate.client; package org.apache.hadoop.hdds.security.x509.certificate.client;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.validator.routines.DomainValidator; import org.apache.commons.validator.routines.DomainValidator;
import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec; import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
@ -81,6 +83,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
private static final String CERT_FILE_NAME_FORMAT = "%s.crt"; private static final String CERT_FILE_NAME_FORMAT = "%s.crt";
private static final String CA_CERT_PREFIX = "CA-"; private static final String CA_CERT_PREFIX = "CA-";
private static final int CA_CERT_PREFIX_LEN = 3;
private final Logger logger; private final Logger logger;
private final SecurityConfig securityConfig; private final SecurityConfig securityConfig;
private final KeyCodec keyCodec; private final KeyCodec keyCodec;
@ -89,9 +92,9 @@ public abstract class DefaultCertificateClient implements CertificateClient {
private X509Certificate x509Certificate; private X509Certificate x509Certificate;
private Map<String, X509Certificate> certificateMap; private Map<String, X509Certificate> certificateMap;
private String certSerialId; private String certSerialId;
private String caCertId;
private String component; private String component;
DefaultCertificateClient(SecurityConfig securityConfig, Logger log, DefaultCertificateClient(SecurityConfig securityConfig, Logger log,
String certSerialId, String component) { String certSerialId, String component) {
Objects.requireNonNull(securityConfig); Objects.requireNonNull(securityConfig);
@ -119,6 +122,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
if (certFiles != null) { if (certFiles != null) {
CertificateCodec certificateCodec = CertificateCodec certificateCodec =
new CertificateCodec(securityConfig, component); new CertificateCodec(securityConfig, component);
long latestCaCertSerailId = -1L;
for (File file : certFiles) { for (File file : certFiles) {
if (file.isFile()) { if (file.isFile()) {
try { try {
@ -132,6 +136,15 @@ public abstract class DefaultCertificateClient implements CertificateClient {
} }
certificateMap.putIfAbsent(cert.getSerialNumber().toString(), certificateMap.putIfAbsent(cert.getSerialNumber().toString(),
cert); cert);
if (file.getName().startsWith(CA_CERT_PREFIX)) {
String certFileName = FilenameUtils.getBaseName(
file.getName());
long tmpCaCertSerailId = NumberUtils.toLong(
certFileName.substring(CA_CERT_PREFIX_LEN));
if (tmpCaCertSerailId > latestCaCertSerailId) {
latestCaCertSerailId = tmpCaCertSerailId;
}
}
getLogger().info("Added certificate from file:{}.", getLogger().info("Added certificate from file:{}.",
file.getAbsolutePath()); file.getAbsolutePath());
} else { } else {
@ -144,6 +157,9 @@ public abstract class DefaultCertificateClient implements CertificateClient {
} }
} }
} }
if (latestCaCertSerailId != -1) {
caCertId = Long.toString(latestCaCertSerailId);
}
} }
} }
} }
@ -221,6 +237,18 @@ public abstract class DefaultCertificateClient implements CertificateClient {
return x509Certificate; return x509Certificate;
} }
/**
* Return the latest CA certificate known to the client.
* @return latest ca certificate known to the client.
*/
@Override
public X509Certificate getCACertificate() {
if (caCertId != null) {
return certificateMap.get(caCertId);
}
return null;
}
/** /**
* Returns the certificate with the specified certificate serial id if it * Returns the certificate with the specified certificate serial id if it
* exists else try to get it from SCM. * exists else try to get it from SCM.
@ -491,6 +519,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
if(caCert) { if(caCert) {
certName = CA_CERT_PREFIX + certName; certName = CA_CERT_PREFIX + certName;
caCertId = cert.getSerialNumber().toString();
} }
certificateCodec.writeCertificate(basePath, certName, certificateCodec.writeCertificate(basePath, certName,

View File

@ -1837,39 +1837,12 @@
<tag>OZONE, HDDS, SECURITY, TLS</tag> <tag>OZONE, HDDS, SECURITY, TLS</tag>
<description>HDDS GRPC server TLS provider.</description> <description>HDDS GRPC server TLS provider.</description>
</property> </property>
<property>
<name>hdds.client.cert.chain.file.name</name>
<value>client.crt</value>
<tag>OZONE, HDDS, SECURITY</tag>
<description>Client certificate file name. It is an optional
field only required when mutual TLS (hdds.grpc.mutual.tls.required)
is set to true .</description>
</property>
<property>
<name>hdds.grpc.mutual.tls.required</name>
<value>false</value>
<tag>OZONE, HDDS, SECURITY, TLS</tag>
<description>If mutual tls check is enabled for GRPC.
Considered only if hdds.grpc.tls.enabled is set to true.</description>
</property>
<property> <property>
<name>hdds.grpc.tls.enabled</name> <name>hdds.grpc.tls.enabled</name>
<value>false</value> <value>false</value>
<tag>OZONE, HDDS, SECURITY, TLS</tag> <tag>OZONE, HDDS, SECURITY, TLS</tag>
<description>If HDDS GRPC server TLS is enabled.</description> <description>If HDDS GRPC server TLS is enabled.</description>
</property> </property>
<property>
<name>hdds.server.cert.chain.file.name</name>
<value>server.crt</value>
<tag>OZONE, HDDS, SECURITY</tag>
<description>Hdds server certificate file name.</description>
</property>
<property>
<name>hdds.trust.cert.collection.file.name</name>
<value>ca.crt</value>
<tag>OZONE, HDDS, SECURITY</tag>
<description>HDDS Certificate Authority trust store file name.</description>
</property>
<property> <property>
<name>hdds.x509.default.duration</name> <name>hdds.x509.default.duration</name>
<value>P365D</value> <value>P365D</value>

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.metrics2.lib.MutableRate;
@InterfaceAudience.Private @InterfaceAudience.Private
@Metrics(about="Storage Container DataNode Metrics", context="dfs") @Metrics(about="Storage Container DataNode Metrics", context="dfs")
public class ContainerMetrics { public class ContainerMetrics {
public static final String STORAGE_CONTAINER_METRICS =
"StorageContainerMetrics";
@Metric private MutableCounterLong numOps; @Metric private MutableCounterLong numOps;
private MutableCounterLong[] numOpsArray; private MutableCounterLong[] numOpsArray;
private MutableCounterLong[] opsBytesArray; private MutableCounterLong[] opsBytesArray;
@ -89,11 +91,16 @@ public class ContainerMetrics {
// Percentile measurement is off by default, by watching no intervals // Percentile measurement is off by default, by watching no intervals
int[] intervals = int[] intervals =
conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY); conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
return ms.register("StorageContainerMetrics", return ms.register(STORAGE_CONTAINER_METRICS,
"Storage Container Node Metrics", "Storage Container Node Metrics",
new ContainerMetrics(intervals)); new ContainerMetrics(intervals));
} }
public static void remove() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(STORAGE_CONTAINER_METRICS);
}
public void incContainerOpsMetrics(ContainerProtos.Type type) { public void incContainerOpsMetrics(ContainerProtos.Type type) {
numOps.incr(); numOps.incr();
numOpsArray[type.ordinal()].incr(); numOpsArray[type.ordinal()].incr();

View File

@ -45,12 +45,10 @@ import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors; import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -112,21 +110,9 @@ public final class XceiverServerGrpc extends XceiverServer {
} }
if (getSecConfig().isGrpcTlsEnabled()) { if (getSecConfig().isGrpcTlsEnabled()) {
File privateKeyFilePath =
getSecurityConfig().getServerPrivateKeyFile(COMPONENT);
File serverCertChainFilePath =
getSecurityConfig().getServerCertChainFile(COMPONENT);
File clientCertChainFilePath =
getSecurityConfig().getClientCertChainFile(COMPONENT);
try { try {
SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer( SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(
serverCertChainFilePath, privateKeyFilePath); caClient.getPrivateKey(), caClient.getCertificate());
if (getSecurityConfig().isGrpcMutualTlsRequired() &&
clientCertChainFilePath != null) {
// Only needed for mutual TLS
sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE);
sslClientContextBuilder.trustManager(clientCertChainFilePath);
}
SslContextBuilder sslContextBuilder = GrpcSslContexts.configure( SslContextBuilder sslContextBuilder = GrpcSslContexts.configure(
sslClientContextBuilder, getSecurityConfig().getGrpcSslProvider()); sslClientContextBuilder, getSecurityConfig().getGrpcSslProvider());
nettyServerBuilder.sslContext(sslContextBuilder.build()); nettyServerBuilder.sslContext(sslContextBuilder.build());

View File

@ -398,8 +398,8 @@ public final class XceiverServerRatis extends XceiverServer {
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) { OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
localPort = 0; localPort = 0;
} }
GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig( GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN(
new SecurityConfig(ozoneConf)); new SecurityConfig(ozoneConf), caClient);
return new XceiverServerRatis(datanodeDetails, localPort, dispatcher, return new XceiverServerRatis(datanodeDetails, localPort, dispatcher,
containerController, context, tlsConfig, caClient, ozoneConf); containerController, context, tlsConfig, caClient, ozoneConf);

View File

@ -236,6 +236,7 @@ public class OzoneContainer {
hddsDispatcher.shutdown(); hddsDispatcher.shutdown();
volumeSet.shutdown(); volumeSet.shutdown();
blockDeletingService.shutdown(); blockDeletingService.shutdown();
ContainerMetrics.remove();
} }

View File

@ -348,7 +348,6 @@ public class ProfileServlet extends HttpServlet {
final HttpServletResponse resp) final HttpServletResponse resp)
throws IOException { throws IOException {
;
String safeFileName = validateFileName(fileName); String safeFileName = validateFileName(fileName);
File requestedFile = File requestedFile =
ProfileServlet.OUTPUT_DIR ProfileServlet.OUTPUT_DIR

View File

@ -17,20 +17,11 @@
*/ */
package org.apache.hadoop.hdds.server; package org.apache.hadoop.hdds.server;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter;
import org.apache.hadoop.hdds.server.ProfileServlet.Event; import org.apache.hadoop.hdds.server.ProfileServlet.Event;
import org.apache.hadoop.hdds.server.ProfileServlet.Output; import org.apache.hadoop.hdds.server.ProfileServlet.Output;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
/** /**

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.ratis.grpc.GrpcTlsConfig;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -38,12 +39,12 @@ public final class PipelineFactory {
private Map<ReplicationType, PipelineProvider> providers; private Map<ReplicationType, PipelineProvider> providers;
PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager, PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
Configuration conf) { Configuration conf, GrpcTlsConfig tlsConfig) {
providers = new HashMap<>(); providers = new HashMap<>();
providers.put(ReplicationType.STAND_ALONE, providers.put(ReplicationType.STAND_ALONE,
new SimplePipelineProvider(nodeManager)); new SimplePipelineProvider(nodeManager));
providers.put(ReplicationType.RATIS, providers.put(ReplicationType.RATIS,
new RatisPipelineProvider(nodeManager, stateManager, conf)); new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig));
} }
@VisibleForTesting @VisibleForTesting

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.ratis.grpc.GrpcTlsConfig;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -94,4 +95,5 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
*/ */
void deactivatePipeline(PipelineID pipelineID) throws IOException; void deactivatePipeline(PipelineID pipelineID) throws IOException;
GrpcTlsConfig getGrpcTlsConfig();
} }

View File

@ -97,7 +97,8 @@ public class PipelineReportHandler implements
try { try {
pipeline = pipelineManager.getPipeline(pipelineID); pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) { } catch (PipelineNotFoundException e) {
RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf); RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf,
pipelineManager.getGrpcTlsConfig());
return; return;
} }

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClient;
@ -84,13 +83,15 @@ public class RatisPipelineProvider implements PipelineProvider {
private final ForkJoinPool forkJoinPool = new ForkJoinPool( private final ForkJoinPool forkJoinPool = new ForkJoinPool(
parallelismForPool, factory, null, false); parallelismForPool, factory, null, false);
private final GrpcTlsConfig tlsConfig;
RatisPipelineProvider(NodeManager nodeManager, RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, Configuration conf) { PipelineStateManager stateManager, Configuration conf,
GrpcTlsConfig tlsConfig) {
this.nodeManager = nodeManager; this.nodeManager = nodeManager;
this.stateManager = stateManager; this.stateManager = stateManager;
this.conf = conf; this.conf = conf;
this.tlsConfig = tlsConfig;
} }
@ -217,8 +218,6 @@ public class RatisPipelineProvider implements PipelineProvider {
Collections.synchronizedList(new ArrayList<>()); Collections.synchronizedList(new ArrayList<>());
final int maxOutstandingRequests = final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(conf); HddsClientUtils.getMaxOutstandingRequests(conf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(conf));
final TimeDuration requestTimeout = final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(conf); RatisHelper.getClientRequestTimeout(conf);
try { try {

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.grpc.GrpcTlsConfig;
@ -54,14 +53,16 @@ public final class RatisPipelineUtils {
* *
* @param pipeline - Pipeline to be destroyed * @param pipeline - Pipeline to be destroyed
* @param ozoneConf - Ozone configuration * @param ozoneConf - Ozone configuration
* @param grpcTlsConfig
* @throws IOException * @throws IOException
*/ */
static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf) { static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
GrpcTlsConfig grpcTlsConfig) {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline); final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
for (DatanodeDetails dn : pipeline.getNodes()) { for (DatanodeDetails dn : pipeline.getNodes()) {
try { try {
destroyPipeline(dn, pipeline.getId(), ozoneConf); destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Pipeline destroy failed for pipeline={} dn={}", LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
pipeline.getId(), dn); pipeline.getId(), dn);
@ -75,10 +76,11 @@ public final class RatisPipelineUtils {
* @param dn - Datanode on which pipeline needs to be destroyed * @param dn - Datanode on which pipeline needs to be destroyed
* @param pipelineID - ID of pipeline to be destroyed * @param pipelineID - ID of pipeline to be destroyed
* @param ozoneConf - Ozone configuration * @param ozoneConf - Ozone configuration
* @param grpcTlsConfig - grpc tls configuration
* @throws IOException * @throws IOException
*/ */
static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
Configuration ozoneConf) throws IOException { Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException {
final String rpcType = ozoneConf final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
@ -86,13 +88,12 @@ public final class RatisPipelineUtils {
final RaftPeer p = RatisHelper.toRaftPeer(dn); final RaftPeer p = RatisHelper.toRaftPeer(dn);
final int maxOutstandingRequests = final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf); HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(
new SecurityConfig(ozoneConf));
final TimeDuration requestTimeout = final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(ozoneConf); RatisHelper.getClientRequestTimeout(ozoneConf);
try(RaftClient client = RatisHelper try(RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) { retryPolicy, maxOutstandingRequests, grpcTlsConfig,
requestTimeout)) {
client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
true, p.getId()); true, p.getId());
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataStore; import org.apache.hadoop.hdds.utils.MetadataStore;
import org.apache.hadoop.hdds.utils.MetadataStoreBuilder; import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
import org.apache.hadoop.hdds.utils.Scheduler; import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -82,14 +83,16 @@ public class SCMPipelineManager implements PipelineManager {
private final Configuration conf; private final Configuration conf;
// Pipeline Manager MXBean // Pipeline Manager MXBean
private ObjectName pmInfoBean; private ObjectName pmInfoBean;
private GrpcTlsConfig grpcTlsConfig;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager, public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher) throws IOException { EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
throws IOException {
this.lock = new ReentrantReadWriteLock(); this.lock = new ReentrantReadWriteLock();
this.conf = conf; this.conf = conf;
this.stateManager = new PipelineStateManager(conf); this.stateManager = new PipelineStateManager(conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
conf); conf, grpcTlsConfig);
// TODO: See if thread priority needs to be set for these threads // TODO: See if thread priority needs to be set for these threads
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator = this.backgroundPipelineCreator =
@ -111,6 +114,7 @@ public class SCMPipelineManager implements PipelineManager {
this.pmInfoBean = MBeans.register("SCMPipelineManager", this.pmInfoBean = MBeans.register("SCMPipelineManager",
"SCMPipelineManagerInfo", this); "SCMPipelineManagerInfo", this);
initializePipelineState(); initializePipelineState();
this.grpcTlsConfig = grpcTlsConfig;
} }
public PipelineStateManager getStateManager() { public PipelineStateManager getStateManager() {
@ -404,7 +408,7 @@ public class SCMPipelineManager implements PipelineManager {
* @throws IOException * @throws IOException
*/ */
private void destroyPipeline(Pipeline pipeline) throws IOException { private void destroyPipeline(Pipeline pipeline) throws IOException {
RatisPipelineUtils.destroyPipeline(pipeline, conf); RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig);
// remove the pipeline from the pipeline manager // remove the pipeline from the pipeline manager
removePipeline(pipeline.getId()); removePipeline(pipeline.getId());
triggerPipelineCreation(); triggerPipelineCreation();
@ -436,6 +440,11 @@ public class SCMPipelineManager implements PipelineManager {
metrics.incNumBlocksAllocated(id); metrics.incNumBlocksAllocated(id);
} }
@Override
public GrpcTlsConfig getGrpcTlsConfig() {
return grpcTlsConfig;
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (scheduler != null) { if (scheduler != null) {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManager;
@ -100,6 +101,7 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.hdds.utils.HddsVersionInfo; import org.apache.hadoop.hdds.utils.HddsVersionInfo;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -186,6 +188,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private SCMSafeModeManager scmSafeModeManager; private SCMSafeModeManager scmSafeModeManager;
private CertificateServer certificateServer; private CertificateServer certificateServer;
private GrpcTlsConfig grpcTlsConfig;
private JvmPauseMonitor jvmPauseMonitor; private JvmPauseMonitor jvmPauseMonitor;
private final OzoneConfiguration configuration; private final OzoneConfiguration configuration;
@ -399,7 +402,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
pipelineManager = configurator.getPipelineManager(); pipelineManager = configurator.getPipelineManager();
} else { } else {
pipelineManager = pipelineManager =
new SCMPipelineManager(conf, scmNodeManager, eventQueue); new SCMPipelineManager(conf, scmNodeManager, eventQueue,
grpcTlsConfig);
} }
if (configurator.getContainerManager() != null) { if (configurator.getContainerManager() != null) {
@ -443,8 +447,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
* @throws AuthenticationException - on Failure * @throws AuthenticationException - on Failure
*/ */
private void initializeCAnSecurityProtocol(OzoneConfiguration conf, private void initializeCAnSecurityProtocol(OzoneConfiguration conf,
SCMConfigurator configurator) SCMConfigurator configurator) throws IOException {
throws IOException {
if(configurator.getCertificateServer() != null) { if(configurator.getCertificateServer() != null) {
this.certificateServer = configurator.getCertificateServer(); this.certificateServer = configurator.getCertificateServer();
} else { } else {
@ -458,6 +461,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
CertificateServer.CAType.SELF_SIGNED_CA); CertificateServer.CAType.SELF_SIGNED_CA);
securityProtocolServer = new SCMSecurityProtocolServer(conf, securityProtocolServer = new SCMSecurityProtocolServer(conf,
certificateServer); certificateServer);
grpcTlsConfig = RatisHelper
.createTlsClientConfigForSCM(new SecurityConfig(conf),
certificateServer);
} }
/** /**

View File

@ -67,7 +67,7 @@ public class TestCloseContainerEventHandler {
.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
nodeManager = new MockNodeManager(true, 10); nodeManager = new MockNodeManager(true, 10);
pipelineManager = pipelineManager =
new SCMPipelineManager(configuration, nodeManager, eventQueue); new SCMPipelineManager(configuration, nodeManager, eventQueue, null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), configuration); pipelineManager.getStateManager(), configuration);

View File

@ -94,7 +94,7 @@ public class TestSCMContainerManager {
} }
nodeManager = new MockNodeManager(true, 10); nodeManager = new MockNodeManager(true, 10);
pipelineManager = pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue()); new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
containerManager = new SCMContainerManager(conf, nodeManager, containerManager = new SCMContainerManager(conf, nodeManager,
pipelineManager, new EventQueue()); pipelineManager, new EventQueue());
xceiverClientManager = new XceiverClientManager(conf); xceiverClientManager = new XceiverClientManager(conf);

View File

@ -65,8 +65,6 @@ import static org.junit.Assert.assertEquals;
public class TestContainerPlacement { public class TestContainerPlacement {
@Rule @Rule
public ExpectedException thrown = ExpectedException.none(); public ExpectedException thrown = ExpectedException.none();
private static XceiverClientManager xceiverClientManager =
new XceiverClientManager(new OzoneConfiguration());
/** /**
* Returns a new copy of Configuration. * Returns a new copy of Configuration.
@ -109,7 +107,7 @@ public class TestContainerPlacement {
final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT); OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
PipelineManager pipelineManager = PipelineManager pipelineManager =
new SCMPipelineManager(config, scmNodeManager, eventQueue); new SCMPipelineManager(config, scmNodeManager, eventQueue, null);
return new SCMContainerManager(config, scmNodeManager, pipelineManager, return new SCMContainerManager(config, scmNodeManager, pipelineManager,
eventQueue); eventQueue);
@ -144,6 +142,7 @@ public class TestContainerPlacement {
createContainerManager(conf, nodeManager); createContainerManager(conf, nodeManager);
List<DatanodeDetails> datanodes = List<DatanodeDetails> datanodes =
TestUtils.getListOfRegisteredDatanodeDetails(nodeManager, nodeCount); TestUtils.getListOfRegisteredDatanodeDetails(nodeManager, nodeCount);
XceiverClientManager xceiverClientManager = null;
try { try {
for (DatanodeDetails datanodeDetails : datanodes) { for (DatanodeDetails datanodeDetails : datanodes) {
nodeManager.processHeartbeat(datanodeDetails); nodeManager.processHeartbeat(datanodeDetails);
@ -159,6 +158,8 @@ public class TestContainerPlacement {
assertEquals(remaining * nodeCount, assertEquals(remaining * nodeCount,
(long) nodeManager.getStats().getRemaining().get()); (long) nodeManager.getStats().getRemaining().get());
xceiverClientManager= new XceiverClientManager(new OzoneConfiguration());
ContainerInfo container = containerManager ContainerInfo container = containerManager
.allocateContainer( .allocateContainer(
xceiverClientManager.getType(), xceiverClientManager.getType(),
@ -169,6 +170,9 @@ public class TestContainerPlacement {
} finally { } finally {
IOUtils.closeQuietly(containerManager); IOUtils.closeQuietly(containerManager);
IOUtils.closeQuietly(nodeManager); IOUtils.closeQuietly(nodeManager);
if (xceiverClientManager != null) {
xceiverClientManager.close();
}
FileUtil.fullyDelete(testDir); FileUtil.fullyDelete(testDir);
} }
} }

View File

@ -31,7 +31,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
public MockRatisPipelineProvider(NodeManager nodeManager, public MockRatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, PipelineStateManager stateManager,
Configuration conf) { Configuration conf) {
super(nodeManager, stateManager, conf); super(nodeManager, stateManager, conf, null);
} }
protected void initializePipeline(Pipeline pipeline) throws IOException { protected void initializePipeline(Pipeline pipeline) throws IOException {

View File

@ -71,7 +71,7 @@ public class TestHealthyPipelineSafeModeRule {
SCMPipelineManager pipelineManager = new SCMPipelineManager(config, SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, eventQueue); nodeManager, eventQueue, null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config); pipelineManager.getStateManager(), config);
@ -116,7 +116,7 @@ public class TestHealthyPipelineSafeModeRule {
SCMPipelineManager pipelineManager = new SCMPipelineManager(config, SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, eventQueue); nodeManager, eventQueue, null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,
@ -191,7 +191,7 @@ public class TestHealthyPipelineSafeModeRule {
SCMPipelineManager pipelineManager = new SCMPipelineManager(config, SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, eventQueue); nodeManager, eventQueue, null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config); pipelineManager.getStateManager(), config);

View File

@ -71,7 +71,7 @@ public class TestOneReplicaPipelineSafeModeRule {
eventQueue = new EventQueue(); eventQueue = new EventQueue();
pipelineManager = pipelineManager =
new SCMPipelineManager(ozoneConfiguration, mockNodeManager, new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
eventQueue); eventQueue, null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(mockNodeManager, new MockRatisPipelineProvider(mockNodeManager,

View File

@ -197,7 +197,7 @@ public class TestSCMSafeModeManager {
0.9); 0.9);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10); MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf, PipelineManager pipelineManager = new SCMPipelineManager(conf,
mockNodeManager, queue); mockNodeManager, queue, null);
scmSafeModeManager = new SCMSafeModeManager( scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue); conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForHealthyPipelinePercent"); fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@ -215,7 +215,7 @@ public class TestSCMSafeModeManager {
200); 200);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10); MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf, PipelineManager pipelineManager = new SCMPipelineManager(conf,
mockNodeManager, queue); mockNodeManager, queue, null);
scmSafeModeManager = new SCMSafeModeManager( scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue); conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForOneReplicaPipelinePercent"); fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@ -232,7 +232,7 @@ public class TestSCMSafeModeManager {
conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0); conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10); MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf, PipelineManager pipelineManager = new SCMPipelineManager(conf,
mockNodeManager, queue); mockNodeManager, queue, null);
scmSafeModeManager = new SCMSafeModeManager( scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue); conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForSafeModePercent"); fail("testFailWithIncorrectValueForSafeModePercent");
@ -256,7 +256,7 @@ public class TestSCMSafeModeManager {
MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount); MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
SCMPipelineManager pipelineManager = new SCMPipelineManager(conf, SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
mockNodeManager, queue); mockNodeManager, queue, null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(mockNodeManager, new MockRatisPipelineProvider(mockNodeManager,
pipelineManager.getStateManager(), config); pipelineManager.getStateManager(), config);
@ -477,7 +477,7 @@ public class TestSCMSafeModeManager {
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true); HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config, SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
nodeManager, queue); nodeManager, queue, null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.client; package org.apache.hadoop.ozone.client;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;

View File

@ -32,14 +32,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumType; .ChecksumType;
import org.apache.hadoop.hdds.scm.ByteStringHelper; import org.apache.hadoop.hdds.scm.ByteStringHelper;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.client.*; import org.apache.hadoop.ozone.client.*;
import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationFactor;
@ -68,21 +65,14 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil; import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB import org.apache.hadoop.ozone.om.protocolPB
.OzoneManagerProtocolClientSideTranslatorPB; .OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.ServicePort;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.security.GDPRSymmetricKey; import org.apache.hadoop.ozone.security.GDPRSymmetricKey;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
@ -102,7 +92,6 @@ import javax.crypto.Cipher;
import javax.crypto.CipherInputStream; import javax.crypto.CipherInputStream;
import javax.crypto.CipherOutputStream; import javax.crypto.CipherOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.security.InvalidKeyException; import java.security.InvalidKeyException;
import java.util.*; import java.util.*;
@ -122,8 +111,6 @@ public class RpcClient implements ClientProtocol {
LoggerFactory.getLogger(RpcClient.class); LoggerFactory.getLogger(RpcClient.class);
private final OzoneConfiguration conf; private final OzoneConfiguration conf;
private final StorageContainerLocationProtocol
storageContainerLocationClient;
private final OzoneManagerProtocol ozoneManagerClient; private final OzoneManagerProtocol ozoneManagerClient;
private final XceiverClientManager xceiverClientManager; private final XceiverClientManager xceiverClientManager;
private final int chunkSize; private final int chunkSize;
@ -163,21 +150,16 @@ public class RpcClient implements ClientProtocol {
this.conf, clientId.toString(), omServiceId, ugi), this.conf, clientId.toString(), omServiceId, ugi),
OzoneManagerProtocol.class, conf OzoneManagerProtocol.class, conf
); );
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
InetSocketAddress scmAddress = getScmAddressForClient();
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
StorageContainerLocationProtocolClientSideTranslatorPB client = ServiceInfoEx serviceInfoEx = ozoneManagerClient.getServiceInfo();
new StorageContainerLocationProtocolClientSideTranslatorPB( String caCertPem = null;
RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion, if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
scmAddress, ugi, conf, NetUtils.getDefaultSocketFactory(conf), caCertPem = serviceInfoEx.getCaCertificate();
Client.getRpcTimeout(conf))); }
this.storageContainerLocationClient =
TracingUtil.createProxy(client, StorageContainerLocationProtocol.class, this.xceiverClientManager = new XceiverClientManager(conf,
conf); OzoneConfiguration.of(conf).getObject(XceiverClientManager.
this.xceiverClientManager = new XceiverClientManager(conf); ScmClientConfig.class), caCertPem);
int configuredChunkSize = (int) conf int configuredChunkSize = (int) conf
.getStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, .getStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
@ -245,15 +227,6 @@ public class RpcClient implements ClientProtocol {
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
} }
private InetSocketAddress getScmAddressForClient() throws IOException {
List<ServiceInfo> services = ozoneManagerClient.getServiceList();
ServiceInfo scmInfo = services.stream().filter(
a -> a.getNodeType().equals(HddsProtos.NodeType.SCM))
.collect(Collectors.toList()).get(0);
return NetUtils.createSocketAddr(
scmInfo.getServiceAddress(ServicePort.Type.RPC));
}
@Override @Override
public void createVolume(String volumeName) throws IOException { public void createVolume(String volumeName) throws IOException {
createVolume(volumeName, VolumeArgs.newBuilder().build()); createVolume(volumeName, VolumeArgs.newBuilder().build());
@ -806,7 +779,6 @@ public class RpcClient implements ClientProtocol {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
IOUtils.cleanupWithLogger(LOG, ozoneManagerClient); IOUtils.cleanupWithLogger(LOG, ozoneManagerClient);
IOUtils.cleanupWithLogger(LOG, xceiverClientManager); IOUtils.cleanupWithLogger(LOG, xceiverClientManager);
} }

View File

@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.om;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.BlockGroup;

View File

@ -16,14 +16,11 @@
*/ */
package org.apache.hadoop.ozone.om.helpers; package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartKeyInfo; .MultipartKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.PartKeyInfo; .PartKeyInfo;
import java.time.Instant;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;

View File

@ -20,9 +20,6 @@ package org.apache.hadoop.ozone.om.helpers;
import java.util.List; import java.util.List;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
/** /**
* List of in-flight MPU uploads. * List of in-flight MPU uploads.
*/ */

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.PartInfo; .PartInfo;

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.helpers;
import java.util.List;
/**
* Wrapper class for service discovery, design for broader usage such as
* security, etc.
*/
public class ServiceInfoEx {
private List<ServiceInfo> infoList;
// PEM encoded string of SCM CA certificate.
private String caCertificate;
public ServiceInfoEx(List<ServiceInfo> infoList,
String caCertificate) {
this.infoList = infoList;
this.caCertificate = caCertificate;
}
public List<ServiceInfo> getServiceInfoList() {
return infoList;
}
public String getCaCertificate() {
return caCertificate;
}
}

View File

@ -22,8 +22,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
@ -31,15 +29,18 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
@ -288,6 +289,8 @@ public interface OzoneManagerProtocol
*/ */
List<ServiceInfo> getServiceList() throws IOException; List<ServiceInfo> getServiceList() throws IOException;
ServiceInfoEx getServiceInfo() throws IOException;
/* /*
* S3 Specific functionality that is supported by Ozone Manager. * S3 Specific functionality that is supported by Ozone Manager.
*/ */

View File

@ -55,9 +55,10 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclRequest;
@ -1211,6 +1212,24 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
} }
@Override
public ServiceInfoEx getServiceInfo() throws IOException {
ServiceListRequest req = ServiceListRequest.newBuilder().build();
OMRequest omRequest = createOMRequest(Type.ServiceList)
.setServiceListRequest(req)
.build();
final ServiceListResponse resp = handleError(submitRequest(omRequest))
.getServiceListResponse();
return new ServiceInfoEx(
resp.getServiceInfoList().stream()
.map(ServiceInfo::getFromProtobuf)
.collect(Collectors.toList()),
resp.getCaCertificate());
}
/** /**
* Get a valid Delegation Token. * Get a valid Delegation Token.
* *

View File

@ -878,6 +878,9 @@ message DBUpdatesRequest {
message ServiceListResponse { message ServiceListResponse {
repeated ServiceInfo serviceInfo = 2; repeated ServiceInfo serviceInfo = 2;
// When security is enabled, return SCM CA certificate to Ozone client
// to set up gRPC TLS for client to authenticate server(DN).
optional string caCertificate = 3;
} }
message DBUpdatesResponse { message DBUpdatesResponse {

View File

@ -76,7 +76,7 @@ public class TestSCMPipelineManager {
@Test @Test
public void testPipelineReload() throws IOException { public void testPipelineReload() throws IOException {
SCMPipelineManager pipelineManager = SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue()); new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf); pipelineManager.getStateManager(), conf);
@ -93,7 +93,7 @@ public class TestSCMPipelineManager {
// new pipeline manager should be able to load the pipelines from the db // new pipeline manager should be able to load the pipelines from the db
pipelineManager = pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue()); new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
mockRatisProvider = mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf); pipelineManager.getStateManager(), conf);
@ -116,7 +116,7 @@ public class TestSCMPipelineManager {
@Test @Test
public void testRemovePipeline() throws IOException { public void testRemovePipeline() throws IOException {
SCMPipelineManager pipelineManager = SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue()); new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf); pipelineManager.getStateManager(), conf);
@ -134,8 +134,7 @@ public class TestSCMPipelineManager {
// new pipeline manager should not be able to load removed pipelines // new pipeline manager should not be able to load removed pipelines
pipelineManager = pipelineManager =
new SCMPipelineManager(conf, nodeManager, new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
new EventQueue());
try { try {
pipelineManager.getPipeline(pipeline.getId()); pipelineManager.getPipeline(pipeline.getId());
Assert.fail("Pipeline should not have been retrieved"); Assert.fail("Pipeline should not have been retrieved");
@ -151,7 +150,7 @@ public class TestSCMPipelineManager {
public void testPipelineReport() throws IOException { public void testPipelineReport() throws IOException {
EventQueue eventQueue = new EventQueue(); EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager = SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, eventQueue); new SCMPipelineManager(conf, nodeManager, eventQueue, null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf); pipelineManager.getStateManager(), conf);
@ -218,7 +217,7 @@ public class TestSCMPipelineManager {
MockNodeManager nodeManagerMock = new MockNodeManager(true, MockNodeManager nodeManagerMock = new MockNodeManager(true,
20); 20);
SCMPipelineManager pipelineManager = SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManagerMock, new EventQueue()); new SCMPipelineManager(conf, nodeManagerMock, new EventQueue(), null);
PipelineProvider mockRatisProvider = PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManagerMock, new MockRatisPipelineProvider(nodeManagerMock,
pipelineManager.getStateManager(), conf); pipelineManager.getStateManager(), conf);
@ -273,7 +272,7 @@ public class TestSCMPipelineManager {
@Test @Test
public void testActivateDeactivatePipeline() throws IOException { public void testActivateDeactivatePipeline() throws IOException {
final SCMPipelineManager pipelineManager = final SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue()); new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
final PipelineProvider mockRatisProvider = final PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager, new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf); pipelineManager.getStateManager(), conf);

View File

@ -83,6 +83,8 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.utils.HddsVersionInfo; import org.apache.hadoop.hdds.utils.HddsVersionInfo;
import org.junit.Assert; import org.junit.Assert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -90,8 +92,6 @@ import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -100,12 +100,7 @@ import com.google.common.collect.Maps;
* Test class that exercises the StorageContainerManager. * Test class that exercises the StorageContainerManager.
*/ */
public class TestStorageContainerManager { public class TestStorageContainerManager {
private static XceiverClientManager xceiverClientManager = private static XceiverClientManager xceiverClientManager;
new XceiverClientManager(
new OzoneConfiguration());
private static final Logger LOG = LoggerFactory.getLogger(
TestStorageContainerManager.class);
/** /**
* Set the timeout for every test. * Set the timeout for every test.
*/ */
@ -121,6 +116,18 @@ public class TestStorageContainerManager {
@Rule @Rule
public TemporaryFolder folder= new TemporaryFolder(); public TemporaryFolder folder= new TemporaryFolder();
@BeforeClass
public static void setup() throws IOException {
xceiverClientManager = new XceiverClientManager(new OzoneConfiguration());
}
@AfterClass
public static void cleanup() {
if (xceiverClientManager != null) {
xceiverClientManager.close();
}
}
@Test @Test
public void testRpcPermission() throws Exception { public void testRpcPermission() throws Exception {
// Test with default configuration // Test with default configuration

View File

@ -61,7 +61,7 @@ public class CertificateClientTestImpl implements CertificateClient {
.setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS)) .setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS))
.setClusterID("cluster1") .setClusterID("cluster1")
.setKey(keyPair) .setKey(keyPair)
.setSubject("TestCertSub") .setSubject("localhost")
.setConfiguration(config) .setConfiguration(config)
.setScmID("TestScmId1") .setScmID("TestScmId1")
.makeCA(); .makeCA();
@ -98,6 +98,11 @@ public class CertificateClientTestImpl implements CertificateClient {
return x509Certificate; return x509Certificate;
} }
@Override
public X509Certificate getCACertificate() {
return x509Certificate;
}
@Override @Override
public boolean verifyCertificate(X509Certificate certificate) { public boolean verifyCertificate(X509Certificate certificate) {
return true; return true;

View File

@ -155,9 +155,13 @@ public class TestContainerReplicationEndToEnd {
.getPipeline(pipelineID); .getPipeline(pipelineID);
key.close(); key.close();
if (cluster.getStorageContainerManager().getContainerManager()
.getContainer(new ContainerID(containerID)).getState() !=
HddsProtos.LifeCycleState.CLOSING) {
cluster.getStorageContainerManager().getContainerManager() cluster.getStorageContainerManager().getContainerManager()
.updateContainerState(new ContainerID(containerID), .updateContainerState(new ContainerID(containerID),
HddsProtos.LifeCycleEvent.FINALIZE); HddsProtos.LifeCycleEvent.FINALIZE);
}
// wait for container to move to OPEN state in SCM // wait for container to move to OPEN state in SCM
Thread.sleep(2 * containerReportInterval); Thread.sleep(2 * containerReportInterval);
DatanodeDetails oldReplicaNode = pipeline.getFirstNode(); DatanodeDetails oldReplicaNode = pipeline.getFirstNode();

View File

@ -18,14 +18,17 @@
package org.apache.hadoop.ozone.container.ozoneimpl; package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc; import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
@ -33,13 +36,10 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ThreadUtil; import org.junit.*;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -49,13 +49,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME_DEFAULT; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME_DEFAULT;
@ -81,34 +79,24 @@ public class TestOzoneContainerWithTLS {
public TemporaryFolder tempFolder = new TemporaryFolder(); public TemporaryFolder tempFolder = new TemporaryFolder();
private OzoneConfiguration conf; private OzoneConfiguration conf;
private SecurityConfig secConfig; private OzoneBlockTokenSecretManager secretManager;
private Boolean requireMutualTls; private CertificateClientTestImpl caClient;
private boolean blockTokenEnabled;
public TestOzoneContainerWithTLS(Boolean requireMutualTls) {
this.requireMutualTls = requireMutualTls;
public TestOzoneContainerWithTLS(boolean blockTokenEnabled) {
this.blockTokenEnabled = blockTokenEnabled;
} }
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Object[]> encryptionOptions() { public static Collection<Object[]> enableBlockToken() {
return Arrays.asList(new Object[][] { return Arrays.asList(new Object[][] {
{true}, {false},
{false} {true}
}); });
} }
private void copyResource(String inputResourceName, File outputFile) throws
IOException {
InputStream is = ThreadUtil.getResourceAsStream(inputResourceName);
try (OutputStream os = new FileOutputStream(outputFile)) {
IOUtils.copy(is, os);
} finally {
IOUtils.closeQuietly(is);
}
}
@Before @Before
public void setup() throws IOException{ public void setup() throws Exception {
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
String ozoneMetaPath = String ozoneMetaPath =
GenericTestUtils.getTempPath("ozoneMeta"); GenericTestUtils.getTempPath("ozoneMeta");
@ -125,21 +113,24 @@ public class TestOzoneContainerWithTLS {
conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_ENABLED, true); conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_ENABLED, true);
conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT, true); conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT, true);
secConfig = new SecurityConfig(conf);
copyResource("ssl/ca.crt", secConfig.getTrustStoreFile()); long expiryTime = conf.getTimeDuration(
copyResource("ssl/server.pem", secConfig.getServerPrivateKeyFile()); HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME,
copyResource("ssl/client.pem", secConfig.getClientPrivateKeyFile()); HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT,
copyResource("ssl/client.crt", secConfig.getClientCertChainFile()); TimeUnit.MILLISECONDS);
copyResource("ssl/server.crt", secConfig.getServerCertChainFile());
caClient = new CertificateClientTestImpl(conf);
secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf),
expiryTime, caClient.getCertificate().
getSerialNumber().toString());
} }
@Test @Test
public void testCreateOzoneContainer() throws Exception { public void testCreateOzoneContainer() throws Exception {
LOG.info("testCreateOzoneContainer with Mutual TLS: {}", LOG.info("testCreateOzoneContainer with TLS and blockToken enabled: {}",
requireMutualTls); blockTokenEnabled);
conf.setBoolean(HddsConfigKeys.HDDS_GRPC_MUTUAL_TLS_REQUIRED, conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED,
requireMutualTls); blockTokenEnabled);
long containerID = ContainerTestHelper.getTestContainerID(); long containerID = ContainerTestHelper.getTestContainerID();
OzoneContainer container = null; OzoneContainer container = null;
@ -154,13 +145,25 @@ public class TestOzoneContainerWithTLS {
conf.setBoolean( conf.setBoolean(
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false); OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
container = new OzoneContainer(dn, conf, getContext(dn), null); container = new OzoneContainer(dn, conf, getContext(dn), caClient);
//Set scmId and manually start ozone container. //Set scmId and manually start ozone container.
container.start(UUID.randomUUID().toString()); container.start(UUID.randomUUID().toString());
XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf); XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf,
client.connect(); caClient.getCACertificate());
if (blockTokenEnabled) {
secretManager.start(caClient);
Token<OzoneBlockTokenIdentifier> token = secretManager.generateToken(
"123", EnumSet.allOf(
HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
RandomUtils.nextLong());
client.connect(token.encodeToUrlString());
createSecureContainerForTesting(client, containerID, token);
} else {
createContainerForTesting(client, containerID); createContainerForTesting(client, containerID);
client.connect();
}
} finally { } finally {
if (container != null) { if (container != null) {
container.stop(); container.stop();
@ -170,7 +173,6 @@ public class TestOzoneContainerWithTLS {
public static void createContainerForTesting(XceiverClientSpi client, public static void createContainerForTesting(XceiverClientSpi client,
long containerID) throws Exception { long containerID) throws Exception {
// Create container
ContainerProtos.ContainerCommandRequestProto request = ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest( ContainerTestHelper.getCreateContainerRequest(
containerID, client.getPipeline()); containerID, client.getPipeline());
@ -179,6 +181,18 @@ public class TestOzoneContainerWithTLS {
Assert.assertNotNull(response); Assert.assertNotNull(response);
} }
public static void createSecureContainerForTesting(XceiverClientSpi client,
long containerID, Token<OzoneBlockTokenIdentifier> token)
throws Exception {
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerSecureRequest(
containerID, client.getPipeline(), token);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
}
private StateContext getContext(DatanodeDetails datanodeDetails) { private StateContext getContext(DatanodeDetails datanodeDetails) {
DatanodeStateMachine stateMachine = Mockito.mock( DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class); DatanodeStateMachine.class);

View File

@ -115,7 +115,7 @@ public class TestXceiverClientManager {
TestXceiverClientManager.class.getName() + UUID.randomUUID()); TestXceiverClientManager.class.getName() + UUID.randomUUID());
conf.set(HDDS_METADATA_DIR_NAME, metaDir); conf.set(HDDS_METADATA_DIR_NAME, metaDir);
XceiverClientManager clientManager = XceiverClientManager clientManager =
new XceiverClientManager(conf, clientConfig); new XceiverClientManager(conf, clientConfig, null);
Cache<String, XceiverClientSpi> cache = Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache(); clientManager.getClientCache();
@ -173,7 +173,7 @@ public class TestXceiverClientManager {
TestXceiverClientManager.class.getName() + UUID.randomUUID()); TestXceiverClientManager.class.getName() + UUID.randomUUID());
conf.set(HDDS_METADATA_DIR_NAME, metaDir); conf.set(HDDS_METADATA_DIR_NAME, metaDir);
XceiverClientManager clientManager = XceiverClientManager clientManager =
new XceiverClientManager(conf, clientConfig); new XceiverClientManager(conf, clientConfig, null);
Cache<String, XceiverClientSpi> cache = Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache(); clientManager.getClientCache();
@ -222,7 +222,7 @@ public class TestXceiverClientManager {
ScmClientConfig clientConfig = conf.getObject(ScmClientConfig.class); ScmClientConfig clientConfig = conf.getObject(ScmClientConfig.class);
clientConfig.setMaxSize(1); clientConfig.setMaxSize(1);
XceiverClientManager clientManager = XceiverClientManager clientManager =
new XceiverClientManager(conf, clientConfig); new XceiverClientManager(conf, clientConfig, null);
Cache<String, XceiverClientSpi> cache = Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache(); clientManager.getClientCache();

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.om; package org.apache.hadoop.ozone.om;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.sun.codemodel.internal.JExpression;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.MetricsSystem;

View File

@ -117,6 +117,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
@ -240,6 +241,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private OzoneDelegationTokenSecretManager delegationTokenMgr; private OzoneDelegationTokenSecretManager delegationTokenMgr;
private OzoneBlockTokenSecretManager blockTokenMgr; private OzoneBlockTokenSecretManager blockTokenMgr;
private CertificateClient certClient; private CertificateClient certClient;
private String caCertPem = null;
private static boolean testSecureOmFlag = false; private static boolean testSecureOmFlag = false;
private final Text omRpcAddressTxt; private final Text omRpcAddressTxt;
private final OzoneConfiguration configuration; private final OzoneConfiguration configuration;
@ -1254,6 +1256,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
metadataManager.start(configuration); metadataManager.start(configuration);
startSecretManagerIfNecessary(); startSecretManagerIfNecessary();
if (certClient != null) {
caCertPem = CertificateCodec.getPEMEncodedString(
certClient.getCACertificate());
}
// Set metrics and start metrics back ground thread // Set metrics and start metrics back ground thread
metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
.getVolumeTable())); .getVolumeTable()));
@ -2591,6 +2597,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
return services; return services;
} }
@Override
public ServiceInfoEx getServiceInfo() throws IOException {
return new ServiceInfoEx(getServiceList(), caCertPem);
}
@Override @Override
/** /**
* {@inheritDoc} * {@inheritDoc}

View File

@ -761,10 +761,12 @@ public class OzoneManagerRequestHandler implements RequestHandler {
throws IOException { throws IOException {
ServiceListResponse.Builder resp = ServiceListResponse.newBuilder(); ServiceListResponse.Builder resp = ServiceListResponse.newBuilder();
resp.addAllServiceInfo(impl.getServiceList().stream() resp.addAllServiceInfo(impl.getServiceInfo().getServiceInfoList().stream()
.map(ServiceInfo::getProtobuf) .map(ServiceInfo::getProtobuf)
.collect(Collectors.toList())); .collect(Collectors.toList()));
if (impl.getServiceInfo().getCaCertificate() != null) {
resp.setCaCertificate(impl.getServiceInfo().getCaCertificate());
}
return resp.build(); return resp.build();
} }