HDDS-597. Ratis: Support secure gRPC endpoint with mTLS for Ratis. Contributed by Ajay Kumar.

This commit is contained in:
Ajay Kumar 2019-01-11 10:32:06 -08:00 committed by Xiaoyu Yao
parent 140565f4db
commit 01a7f9ed35
9 changed files with 120 additions and 41 deletions

View File

@ -133,7 +133,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.intercept(new ClientCredentialInterceptor(userName, encodedToken));
if (SecurityConfig.isGrpcTlsEnabled(config)) {
if (secConfig.isGrpcTlsEnabled()) {
File trustCertCollectionFile = secConfig.getTrustStoreFile();
File privateKeyFile = secConfig.getClientPrivateKeyFile();
File clientCertChainFile = secConfig.getClientCertChainFile();

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicy;
@ -69,9 +71,11 @@ public final class XceiverClientRatis extends XceiverClientSpi {
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
retryPolicy);
retryPolicy, tlsConfig);
}
private final Pipeline pipeline;
@ -79,6 +83,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
private final AtomicReference<RaftClient> client = new AtomicReference<>();
private final int maxOutstandingRequests;
private final RetryPolicy retryPolicy;
private final GrpcTlsConfig tlsConfig;
// Map to track commit index at every server
private final ConcurrentHashMap<String, Long> commitInfoMap;
@ -90,7 +95,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
* Constructs a client.
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
int maxOutStandingChunks, RetryPolicy retryPolicy) {
int maxOutStandingChunks, RetryPolicy retryPolicy,
GrpcTlsConfig tlsConfig) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
@ -98,6 +104,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
this.retryPolicy = retryPolicy;
commitInfoMap = new ConcurrentHashMap<>();
watchClient = null;
this.tlsConfig = tlsConfig;
}
private void updateCommitInfosMap(
@ -145,7 +152,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
// maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client
if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy))) {
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig))) {
throw new IllegalStateException("Client is already connected.");
}
}
@ -211,7 +219,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
// create a new RaftClient instance for watch request
if (watchClient == null) {
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy);
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig);
}
CompletableFuture<RaftClientReply> replyFuture = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
@ -229,7 +238,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
// here once the watch request bypassing sliding window in Raft Client
// gets fixed.
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy);
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig);
reply = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get(timeout, TimeUnit.MILLISECONDS);

View File

@ -98,16 +98,17 @@ public class SecurityConfig {
private final String publicKeyFileName;
private final Duration certDuration;
private final String x509SignatureAlgo;
private final Boolean grpcBlockTokenEnabled;
private final boolean grpcBlockTokenEnabled;
private final String certificateDir;
private final String certificateFileName;
private final Boolean grpcTlsEnabled;
private Boolean grpcTlsUseTestCert;
private final boolean grpcTlsEnabled;
private boolean grpcTlsUseTestCert;
private String trustStoreFileName;
private String serverCertChainFileName;
private String clientCertChainFileName;
private final Duration defaultCertDuration;
private final boolean isSecurityEnabled;
private boolean grpcMutualTlsRequired;
/**
* Constructs a SecurityConfig.
@ -152,7 +153,10 @@ public class SecurityConfig {
this.grpcTlsEnabled = this.configuration.getBoolean(HDDS_GRPC_TLS_ENABLED,
HDDS_GRPC_TLS_ENABLED_DEFAULT);
if (grpcTlsEnabled) {
this.grpcMutualTlsRequired = configuration.getBoolean(
HDDS_GRPC_MUTUAL_TLS_REQUIRED, HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT);
this.trustStoreFileName = this.configuration.get(
HDDS_TRUST_STORE_FILE_NAME, HDDS_TRUST_STORE_FILE_NAME_DEFAULT);
@ -353,27 +357,24 @@ public class SecurityConfig {
return this.certDuration;
}
public Boolean isGrpcBlockTokenEnabled() {
public boolean isGrpcBlockTokenEnabled() {
return this.grpcBlockTokenEnabled;
}
/**
* Returns true if TLS is enabled for gRPC services.
* @param conf configuration
* @return true if TLS is enabled for gRPC services.
*/
public static Boolean isGrpcTlsEnabled(Configuration conf) {
return conf.getBoolean(HDDS_GRPC_TLS_ENABLED,
HDDS_GRPC_TLS_ENABLED_DEFAULT);
public boolean isGrpcTlsEnabled() {
return this.grpcTlsEnabled;
}
/**
* Returns true if TLS mutual authentication is enabled for gRPC services.
* @return true if TLS is enabled for gRPC services.
*/
public Boolean isGrpcMutualTlsRequired() {
return configuration.getBoolean(HDDS_GRPC_MUTUAL_TLS_REQUIRED,
HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT);
public boolean isGrpcMutualTlsRequired() {
return this.grpcMutualTlsRequired;
}
/**

View File

@ -21,11 +21,14 @@ package org.apache.ratis;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
@ -33,6 +36,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.util.SizeInBytes;
@ -128,38 +132,77 @@ public interface RatisHelper {
}
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
RetryPolicy retryPolicy) throws IOException {
RetryPolicy retryPolicy, int maxOutStandingRequest,
GrpcTlsConfig tlsConfig) throws IOException {
return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()),
newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
pipeline.getNodes()), retryPolicy);
pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy) {
RetryPolicy retryPolicy, int maxOutstandingRequests,
GrpcTlsConfig tlsConfig) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy);
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
maxOutstandingRequests, tlsConfig);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RaftGroup group, RetryPolicy retryPolicy) {
return newRaftClient(rpcType, leader.getId(), group, retryPolicy);
RetryPolicy retryPolicy, int maxOutstandingRequests) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
maxOutstandingRequests, null);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
RaftGroup group, RetryPolicy retryPolicy) {
RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest,
GrpcTlsConfig tlsConfig) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
final RaftProperties properties = new RaftProperties();
RaftConfigKeys.Rpc.setType(properties, rpcType);
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE));
return RaftClient.newBuilder()
GrpcConfigKeys.OutputStream.setOutstandingAppendsMax(properties,
maxOutStandingRequest);
RaftClient.Builder builder = RaftClient.newBuilder()
.setRaftGroup(group)
.setLeaderId(leader)
.setProperties(properties)
.setRetryPolicy(retryPolicy)
.build();
.setRetryPolicy(retryPolicy);
// TODO: GRPC TLS only for now, netty/hadoop RPC TLS support later.
if (tlsConfig != null && rpcType == SupportedRpcType.GRPC) {
builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig));
}
return builder.build();
}
static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf) {
if (conf.isGrpcTlsEnabled()) {
if (conf.isGrpcMutualTlsRequired()) {
return new GrpcTlsConfig(
null, null, conf.getTrustStoreFile(), false);
} else {
return new GrpcTlsConfig(conf.getClientPrivateKeyFile(),
conf.getClientCertChainFile(), conf.getTrustStoreFile(), true);
}
}
return null;
}
static GrpcTlsConfig createTlsServerConfig(SecurityConfig conf) {
if (conf.isGrpcTlsEnabled()) {
if (conf.isGrpcMutualTlsRequired()) {
return new GrpcTlsConfig(
conf.getServerPrivateKeyFile(), conf.getServerCertChainFile(), null,
false);
} else {
return new GrpcTlsConfig(conf.getServerPrivateKeyFile(),
conf.getServerCertChainFile(), conf.getClientCertChainFile(), true);
}
}
return null;
}
static RetryPolicy createRetryPolicy(Configuration conf) {

View File

@ -115,7 +115,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
nettyServerBuilder.addService(service);
}
if (SecurityConfig.isGrpcTlsEnabled(conf)) {
if (secConfig.isGrpcTlsEnabled()) {
File privateKeyFilePath = secConfig.getServerPrivateKeyFile();
File serverCertChainFilePath = secConfig.getServerCertChainFile();
File clientCertChainFilePath = secConfig.getClientCertChainFile();

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@ -44,6 +45,8 @@ import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.Message;
@ -66,6 +69,7 @@ import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HEAD;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -107,9 +111,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
private long nodeFailureTimeoutMs;
private final long cacheEntryExpiryInteval;
private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext context)
ContainerDispatcher dispatcher, Configuration conf, StateContext
context, GrpcTlsConfig tlsConfig)
throws IOException {
Objects.requireNonNull(dd, "id == null");
this.port = port;
@ -139,12 +143,14 @@ public final class XceiverServerRatis implements XceiverServerSpi {
for (int i = 0; i < numContainerOpExecutors; i++) {
executors.add(Executors.newSingleThreadExecutor());
}
this.server = RaftServer.newBuilder()
RaftServer.Builder builder = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(dd))
.setProperties(serverProperties)
.setStateMachineRegistry(this::getStateMachine)
.build();
.setStateMachineRegistry(this::getStateMachine);
if (tlsConfig != null) {
builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig));
}
this.server = builder.build();
}
private ContainerStateMachine getStateMachine(RaftGroupId gid) {
@ -405,10 +411,12 @@ public final class XceiverServerRatis implements XceiverServerSpi {
+ "fallback to use default port {}", localPort, e);
}
}
GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig(
new SecurityConfig(ozoneConf));
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
return new XceiverServerRatis(datanodeDetails, localPort,
dispatcher, ozoneConf, context);
dispatcher, ozoneConf, context, tlsConfig);
}
@Override

View File

@ -242,8 +242,9 @@ public class TestCloseContainerCommandHandler {
final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails);
final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
Collections.singleton(datanodeDetails));
final RaftClient client = RatisHelper.newRaftClient(
SupportedRpcType.GRPC, peer, retryPolicy);
final int maxOutstandingRequests = 100;
final RaftClient client = RatisHelper.newRaftClient(SupportedRpcType.GRPC,
peer, retryPolicy, maxOutstandingRequests, null);
Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess());
Thread.sleep(2000);
final ContainerID containerId = ContainerID.valueof(

View File

@ -20,9 +20,12 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
@ -135,9 +138,13 @@ public final class RatisPipelineUtils {
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final RaftPeer p = RatisHelper.toRaftPeer(dn);
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(
new SecurityConfig(ozoneConf));
RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy);
retryPolicy, maxOutstandingRequests, tlsConfig);
client
.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId());
}
@ -156,12 +163,16 @@ public final class RatisPipelineUtils {
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final List<IOException> exceptions =
Collections.synchronizedList(new ArrayList<>());
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));
datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy)) {
retryPolicy, maxOutstandingRequests, tlsConfig)) {
rpc.accept(client, p);
} catch (IOException ioe) {
exceptions.add(

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ozone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
@ -41,6 +42,7 @@ import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.ratis.RatisHelper.newRaftClient;
/**
* Helpers for Ratis tests.
@ -122,8 +124,11 @@ public interface RatisTestHelper {
RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(dd);
final OzoneConfiguration conf = new OzoneConfiguration();
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(conf);
final RaftClient client =
RatisHelper.newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf));
newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf),
maxOutstandingRequests);
client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId());
}
}