HBASE-16414 Improve performance for RPC encryption with Apache Common

Crypto (Colin Ma)
This commit is contained in:
Ramkrishna 2016-10-21 16:02:39 +05:30
parent d3decaab8e
commit 0ae211eb39
19 changed files with 2887 additions and 52 deletions

View File

@ -60,12 +60,14 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
@ -111,6 +113,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
private byte[] connectionHeaderWithLength;
private boolean waitingConnectionHeaderResponse = false;
/**
* If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
* java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
@ -349,7 +353,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
throws IOException {
saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal,
this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
return saslRpcClient.saslConnect(in2, out2);
}
@ -462,8 +467,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
}
if (continueSasl) {
// Sasl connect is successful. Let's set up Sasl i/o streams.
inStream = saslRpcClient.getInputStream(inStream);
outStream = saslRpcClient.getOutputStream(outStream);
inStream = saslRpcClient.getInputStream();
outStream = saslRpcClient.getOutputStream();
} else {
// fall back to simple auth because server told us so.
// do not change authMethod and useSasl here, we should start from secure when
@ -474,6 +479,9 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
// Now write out the connection header
writeConnectionHeader();
// process the response from server for connection header if necessary
processResponseForConnectionHeader();
break;
}
} catch (Throwable t) {
@ -511,10 +519,60 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
* Write the connection header.
*/
private void writeConnectionHeader() throws IOException {
boolean isCryptoAesEnable = false;
// check if Crypto AES is enabled
if (saslRpcClient != null) {
boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.
getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP());
isCryptoAesEnable = saslEncryptionEnabled && conf.getBoolean(
CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
}
// if Crypto AES is enabled, set transformation and negotiate with server
if (isCryptoAesEnable) {
waitingConnectionHeaderResponse = true;
}
this.out.write(connectionHeaderWithLength);
this.out.flush();
}
private void processResponseForConnectionHeader() throws IOException {
// if no response excepted, return
if (!waitingConnectionHeaderResponse) return;
try {
// read the ConnectionHeaderResponse from server
int len = this.in.readInt();
byte[] buff = new byte[len];
int readSize = this.in.read(buff);
if (LOG.isDebugEnabled()) {
LOG.debug("Length of response for connection header:" + readSize);
}
RPCProtos.ConnectionHeaderResponse connectionHeaderResponse =
RPCProtos.ConnectionHeaderResponse.parseFrom(buff);
// Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher
if (connectionHeaderResponse.hasCryptoCipherMeta()) {
negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta());
}
waitingConnectionHeaderResponse = false;
} catch (SocketTimeoutException ste) {
LOG.fatal("Can't get the connection header response for rpc timeout, please check if" +
" server has the correct configuration to support the additional function.", ste);
// timeout when waiting the connection header response, ignore the additional function
throw new IOException("Timeout while waiting connection header response", ste);
}
}
private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
throws IOException {
// initilize the Crypto AES with CryptoCipherMeta
saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf);
// reset the inputStream/outputStream for Crypto AES encryption
this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream()));
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
}
private void tracedWriteRequest(Call call) throws IOException {
try (TraceScope ignored = Trace.startSpan("RpcClientImpl.tracedWriteRequest", call.span)) {
writeRequest(call);

View File

@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import io.netty.bootstrap.Bootstrap;
@ -55,7 +57,6 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
@ -130,8 +131,7 @@ class NettyRpcConnection extends RpcConnection {
}
}
private void established(Channel ch) {
ch.write(connectionHeaderWithLength.retainedDuplicate());
private void established(Channel ch) throws IOException {
ChannelPipeline p = ch.pipeline();
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
p.addBefore(addBeforeHandler, null,
@ -188,11 +188,10 @@ class NettyRpcConnection extends RpcConnection {
return;
}
Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
ChannelHandler saslHandler;
final NettyHBaseSaslRpcClientHandler saslHandler;
try {
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token,
serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get(
"hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf);
} catch (IOException e) {
failInit(ch, e);
return;
@ -206,7 +205,41 @@ class NettyRpcConnection extends RpcConnection {
ChannelPipeline p = ch.pipeline();
p.remove(SaslChallengeDecoder.class);
p.remove(NettyHBaseSaslRpcClientHandler.class);
established(ch);
// check if negotiate with server for connection header is necessary
if (saslHandler.isNeedProcessConnectionHeader()) {
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
// create the handler to handle the connection header
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
connectionHeaderPromise, conf, connectionHeaderWithLength);
// add ReadTimeoutHandler to deal with server doesn't response connection header
// because of the different configuration in client side and server side
p.addFirst(new ReadTimeoutHandler(
RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
p.addLast(chHandler);
connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = ch.pipeline();
p.remove(ReadTimeoutHandler.class);
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
// sent it already
established(ch);
} else {
final Throwable error = future.cause();
scheduleRelogin(error);
failInit(ch, toIOE(error));
}
}
});
} else {
// send the connection header to server
ch.write(connectionHeaderWithLength.retainedDuplicate());
established(ch);
}
} else {
final Throwable error = future.cause();
scheduleRelogin(error);
@ -240,6 +273,8 @@ class NettyRpcConnection extends RpcConnection {
if (useSasl) {
saslNegotiate(ch);
} else {
// send the connection header to server
ch.write(connectionHeaderWithLength.retainedDuplicate());
established(ch);
}
}

View File

@ -72,6 +72,12 @@ abstract class RpcConnection {
protected final HashedWheelTimer timeoutTimer;
protected final Configuration conf;
protected static String CRYPTO_AES_ENABLED_KEY = "hbase.rpc.crypto.encryption.aes.enabled";
protected static boolean CRYPTO_AES_ENABLED_DEFAULT = false;
// the last time we were picked up from connection pool.
protected long lastTouched;
@ -84,6 +90,7 @@ abstract class RpcConnection {
this.timeoutTimer = timeoutTimer;
this.codec = codec;
this.compressor = compressor;
this.conf = conf;
UserGroupInformation ticket = remoteId.getTicket().getUGI();
SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
@ -224,6 +231,12 @@ abstract class RpcConnection {
builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
}
builder.setVersionInfo(ProtobufUtil.getVersionInfo());
boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
// if Crypto AES enable, setup Cipher transformation
if (isCryptoAESEnable) {
builder.setRpcCryptoCipherTransformation(
conf.get("hbase.rpc.crypto.encryption.aes.cipher.transform", "AES/CTR/NoPadding"));
}
return builder.build();
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class UnsupportedCryptoException extends FatalConnectionException {
public UnsupportedCryptoException() {
super();
}
public UnsupportedCryptoException(String msg) {
super(msg);
}
public UnsupportedCryptoException(String msg, Throwable t) {
super(msg, t);
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
/**
* Unwrap messages with Crypto AES. Should be placed after a
* {@link io.netty.handler.codec.LengthFieldBasedFrameDecoder}
*/
@InterfaceAudience.Private
public class CryptoAESUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final CryptoAES cryptoAES;
public CryptoAESUnwrapHandler(CryptoAES cryptoAES) {
this.cryptoAES = cryptoAES;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
ctx.fireChannelRead(Unpooled.wrappedBuffer(cryptoAES.unwrap(bytes, 0, bytes.length)));
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import java.io.IOException;
/**
* wrap messages with Crypto AES.
*/
@InterfaceAudience.Private
public class CryptoAESWrapHandler extends ChannelOutboundHandlerAdapter {
private final CryptoAES cryptoAES;
private CoalescingBufferQueue queue;
public CryptoAESWrapHandler(CryptoAES cryptoAES) {
this.cryptoAES = cryptoAES;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
queue = new CoalescingBufferQueue(ctx.channel());
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof ByteBuf) {
queue.add((ByteBuf) msg, promise);
} else {
ctx.write(msg, promise);
}
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
if (queue.isEmpty()) {
return;
}
ByteBuf buf = null;
try {
ChannelPromise promise = ctx.newPromise();
int readableBytes = queue.readableBytes();
buf = queue.remove(readableBytes, promise);
byte[] bytes = new byte[readableBytes];
buf.readBytes(bytes);
byte[] wrapperBytes = cryptoAES.wrap(bytes, 0, bytes.length);
ChannelPromise lenPromise = ctx.newPromise();
ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
ChannelPromise contentPromise = ctx.newPromise();
ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
PromiseCombiner combiner = new PromiseCombiner();
combiner.addAll(lenPromise, contentPromise);
combiner.finish(promise);
ctx.flush();
} finally {
if (buf != null) {
ReferenceCountUtil.safeRelease(buf);
}
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (!queue.isEmpty()) {
queue.releaseAndFailAll(new IOException("Connection closed"));
}
ctx.close(promise);
}
}

View File

@ -23,9 +23,11 @@ import java.io.IOException;
import java.security.Key;
import java.security.KeyException;
import java.security.SecureRandom;
import java.util.Properties;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -37,6 +39,8 @@ import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.EncryptionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -255,4 +259,27 @@ public final class EncryptionUtil {
}
return key;
}
/**
* Helper to create an instance of CryptoAES.
*
* @param conf The current configuration.
* @param cryptoCipherMeta The metadata for create CryptoAES.
* @return The instance of CryptoAES.
* @throws IOException if create CryptoAES failed
*/
public static CryptoAES createCryptoAES(RPCProtos.CryptoCipherMeta cryptoCipherMeta,
Configuration conf) throws IOException {
Properties properties = new Properties();
// the property for cipher class
properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
"org.apache.commons.crypto.cipher.JceCipher"));
// create SaslAES for client
return new CryptoAES(cryptoCipherMeta.getTransformation(), properties,
cryptoCipherMeta.getInKey().toByteArray(),
cryptoCipherMeta.getOutKey().toByteArray(),
cryptoCipherMeta.getInIv().toByteArray(),
cryptoCipherMeta.getOutIv().toByteArray());
}
}

View File

@ -22,16 +22,22 @@ import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.SaslInputStream;
@ -47,6 +53,13 @@ import org.apache.hadoop.security.token.TokenIdentifier;
public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
private static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
private boolean cryptoAesEnable;
private CryptoAES cryptoAES;
private InputStream saslInputStream;
private InputStream cryptoInputStream;
private OutputStream saslOutputStream;
private OutputStream cryptoOutputStream;
private boolean initStreamForCrypto;
public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
String serverPrincipal, boolean fallbackAllowed) throws IOException {
@ -54,8 +67,10 @@ public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
}
public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException {
String serverPrincipal, boolean fallbackAllowed, String rpcProtection,
boolean initStreamForCrypto) throws IOException {
super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
this.initStreamForCrypto = initStreamForCrypto;
}
private static void readStatus(DataInputStream inStream) throws IOException {
@ -133,6 +148,18 @@ public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
LOG.debug("SASL client context established. Negotiated QoP: "
+ saslClient.getNegotiatedProperty(Sasl.QOP));
}
// initial the inputStream, outputStream for both Sasl encryption
// and Crypto AES encryption if necessary
// if Crypto AES encryption enabled, the saslInputStream/saslOutputStream is
// only responsible for connection header negotiation,
// cryptoInputStream/cryptoOutputStream is responsible for rpc encryption with Crypto AES
saslInputStream = new SaslInputStream(inS, saslClient);
saslOutputStream = new SaslOutputStream(outS, saslClient);
if (initStreamForCrypto) {
cryptoInputStream = new WrappedInputStream(inS);
cryptoOutputStream = new WrappedOutputStream(outS);
}
return true;
} catch (IOException e) {
try {
@ -144,29 +171,112 @@ public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
}
}
public String getSaslQOP() {
return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
}
public void initCryptoCipher(RPCProtos.CryptoCipherMeta cryptoCipherMeta,
Configuration conf) throws IOException {
// create SaslAES for client
cryptoAES = EncryptionUtil.createCryptoAES(cryptoCipherMeta, conf);
cryptoAesEnable = true;
}
/**
* Get a SASL wrapped InputStream. Can be called only after saslConnect() has been called.
* @param in the InputStream to wrap
* @return a SASL wrapped InputStream
* @throws IOException
*/
public InputStream getInputStream(InputStream in) throws IOException {
public InputStream getInputStream() throws IOException {
if (!saslClient.isComplete()) {
throw new IOException("Sasl authentication exchange hasn't completed yet");
}
return new SaslInputStream(in, saslClient);
// If Crypto AES is enabled, return cryptoInputStream which unwrap the data with Crypto AES.
if (cryptoAesEnable && cryptoInputStream != null) {
return cryptoInputStream;
}
return saslInputStream;
}
class WrappedInputStream extends FilterInputStream {
private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
public WrappedInputStream(InputStream in) throws IOException {
super(in);
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
int n = read(b, 0, 1);
return (n != -1) ? b[0] : -1;
}
@Override
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
@Override
public synchronized int read(byte[] buf, int off, int len) throws IOException {
// fill the buffer with the next RPC message
if (unwrappedRpcBuffer.remaining() == 0) {
readNextRpcPacket();
}
// satisfy as much of the request as possible
int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
unwrappedRpcBuffer.get(buf, off, readLen);
return readLen;
}
// unwrap messages with Crypto AES
private void readNextRpcPacket() throws IOException {
LOG.debug("reading next wrapped RPC packet");
DataInputStream dis = new DataInputStream(in);
int rpcLen = dis.readInt();
byte[] rpcBuf = new byte[rpcLen];
dis.readFully(rpcBuf);
// unwrap with Crypto AES
rpcBuf = cryptoAES.unwrap(rpcBuf, 0, rpcBuf.length);
if (LOG.isDebugEnabled()) {
LOG.debug("unwrapping token of length:" + rpcBuf.length);
}
unwrappedRpcBuffer = ByteBuffer.wrap(rpcBuf);
}
}
/**
* Get a SASL wrapped OutputStream. Can be called only after saslConnect() has been called.
* @param out the OutputStream to wrap
* @return a SASL wrapped OutputStream
* @throws IOException
*/
public OutputStream getOutputStream(OutputStream out) throws IOException {
public OutputStream getOutputStream() throws IOException {
if (!saslClient.isComplete()) {
throw new IOException("Sasl authentication exchange hasn't completed yet");
}
return new SaslOutputStream(out, saslClient);
// If Crypto AES is enabled, return cryptoOutputStream which wrap the data with Crypto AES.
if (cryptoAesEnable && cryptoOutputStream != null) {
return cryptoOutputStream;
}
return saslOutputStream;
}
class WrappedOutputStream extends FilterOutputStream {
public WrappedOutputStream(OutputStream out) throws IOException {
super(out);
}
@Override
public void write(byte[] buf, int off, int len) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("wrapping token of length:" + len);
}
// wrap with Crypto AES
byte[] wrapped = cryptoAES.wrap(buf, off, len);
DataOutputStream dob = new DataOutputStream(out);
dob.writeInt(wrapped.length);
dob.write(wrapped, 0, wrapped.length);
dob.flush();
}
}
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.concurrent.Promise;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
/**
* Implement logic to deal with the rpc connection header.
*/
@InterfaceAudience.Private
public class NettyHBaseRpcConnectionHeaderHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final Promise<Boolean> saslPromise;
private final Configuration conf;
private final ByteBuf connectionHeaderWithLength;
public NettyHBaseRpcConnectionHeaderHandler(Promise<Boolean> saslPromise, Configuration conf,
ByteBuf connectionHeaderWithLength) {
this.saslPromise = saslPromise;
this.conf = conf;
this.connectionHeaderWithLength = connectionHeaderWithLength;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// read the ConnectionHeaderResponse from server
int len = msg.readInt();
byte[] buff = new byte[len];
msg.readBytes(buff);
RPCProtos.ConnectionHeaderResponse connectionHeaderResponse =
RPCProtos.ConnectionHeaderResponse.parseFrom(buff);
// Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher
if (connectionHeaderResponse.hasCryptoCipherMeta()) {
CryptoAES cryptoAES = EncryptionUtil.createCryptoAES(
connectionHeaderResponse.getCryptoCipherMeta(), conf);
// replace the Sasl handler with Crypto AES handler
setupCryptoAESHandler(ctx.pipeline(), cryptoAES);
}
saslPromise.setSuccess(true);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
try {
// send the connection header to server first
ctx.writeAndFlush(connectionHeaderWithLength.retainedDuplicate());
} catch (Exception e) {
// the exception thrown by handlerAdded will not be passed to the exceptionCaught below
// because netty will remove a handler if handlerAdded throws an exception.
exceptionCaught(ctx, e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
saslPromise.tryFailure(cause);
}
/**
* Remove handlers for sasl encryption and add handlers for Crypto AES encryption
*/
private void setupCryptoAESHandler(ChannelPipeline p, CryptoAES cryptoAES) {
p.remove(SaslWrapHandler.class);
p.remove(SaslUnwrapHandler.class);
String lengthDecoder = p.context(LengthFieldBasedFrameDecoder.class).name();
p.addAfter(lengthDecoder, null, new CryptoAESUnwrapHandler(cryptoAES));
p.addAfter(lengthDecoder, null, new CryptoAESWrapHandler(cryptoAES));
}
}

View File

@ -47,6 +47,7 @@ public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
if (LOG.isDebugEnabled()) {
LOG.debug("SASL client context established. Negotiated QoP: " + qop);
}
if (qop == null || "auth".equalsIgnoreCase(qop)) {
return;
}
@ -55,4 +56,8 @@ public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
new SaslUnwrapHandler(saslClient));
}
public String getSaslQOP() {
return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
}
}

View File

@ -27,6 +27,7 @@ import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
import org.apache.hadoop.security.UserGroupInformation;
@ -47,17 +48,25 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
private final NettyHBaseSaslRpcClient saslRpcClient;
private final Configuration conf;
// flag to mark if Crypto AES encryption is enable
private boolean needProcessConnectionHeader = false;
/**
* @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to
* simple.
*/
public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal,
boolean fallbackAllowed, String rpcProtection) throws IOException {
boolean fallbackAllowed, Configuration conf)
throws IOException {
this.saslPromise = saslPromise;
this.ugi = ugi;
this.conf = conf;
this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, serverPrincipal,
fallbackAllowed, rpcProtection);
fallbackAllowed, conf.get(
"hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
}
private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
@ -72,10 +81,24 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
if (!saslRpcClient.isComplete()) {
return;
}
saslRpcClient.setupSaslHandler(ctx.pipeline());
setCryptoAESOption();
saslPromise.setSuccess(true);
}
private void setCryptoAESOption() {
boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.
getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP());
needProcessConnectionHeader = saslEncryptionEnabled && conf.getBoolean(
"hbase.rpc.crypto.encryption.aes.enabled", false);
}
public boolean isNeedProcessConnectionHeader() {
return needProcessConnectionHeader;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
try {

View File

@ -88,7 +88,7 @@ public class TestHBaseSaslRpcClient {
DEFAULT_USER_PASSWORD);
for (SaslUtil.QualityOfProtection qop : SaslUtil.QualityOfProtection.values()) {
String negotiatedQop = new HBaseSaslRpcClient(AuthMethod.DIGEST, token,
"principal/host@DOMAIN.COM", false, qop.name()) {
"principal/host@DOMAIN.COM", false, qop.name(), false) {
public String getQop() {
return saslProps.get(Sasl.QOP);
}
@ -211,14 +211,14 @@ public class TestHBaseSaslRpcClient {
};
try {
rpcClient.getInputStream(Mockito.mock(InputStream.class));
rpcClient.getInputStream();
} catch(IOException ex) {
//Sasl authentication exchange hasn't completed yet
inState = true;
}
try {
rpcClient.getOutputStream(Mockito.mock(OutputStream.class));
rpcClient.getOutputStream();
} catch(IOException ex) {
//Sasl authentication exchange hasn't completed yet
outState = true;

View File

@ -275,6 +275,10 @@
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>
</dependencies>
<profiles>

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.crypto.aes;
import org.apache.commons.crypto.cipher.CryptoCipher;
import org.apache.commons.crypto.utils.Utils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import javax.crypto.Cipher;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.ShortBufferException;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import javax.security.sasl.SaslException;
import java.io.IOException;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Properties;
/**
* AES encryption and decryption.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CryptoAES {
private final CryptoCipher encryptor;
private final CryptoCipher decryptor;
private final Integrity integrity;
public CryptoAES(String transformation, Properties properties,
byte[] inKey, byte[] outKey, byte[] inIv, byte[] outIv) throws IOException {
checkTransformation(transformation);
// encryptor
encryptor = Utils.getCipherInstance(transformation, properties);
try {
SecretKeySpec outKEYSpec = new SecretKeySpec(outKey, "AES");
IvParameterSpec outIVSpec = new IvParameterSpec(outIv);
encryptor.init(Cipher.ENCRYPT_MODE, outKEYSpec, outIVSpec);
} catch (InvalidKeyException | InvalidAlgorithmParameterException e) {
throw new IOException("Failed to initialize encryptor", e);
}
// decryptor
decryptor = Utils.getCipherInstance(transformation, properties);
try {
SecretKeySpec inKEYSpec = new SecretKeySpec(inKey, "AES");
IvParameterSpec inIVSpec = new IvParameterSpec(inIv);
decryptor.init(Cipher.DECRYPT_MODE, inKEYSpec, inIVSpec);
} catch (InvalidKeyException | InvalidAlgorithmParameterException e) {
throw new IOException("Failed to initialize decryptor", e);
}
integrity = new Integrity(outKey, inKey);
}
/**
* Encrypts input data. The result composes of (msg, padding if needed, mac) and sequence num.
* @param data the input byte array
* @param offset the offset in input where the input starts
* @param len the input length
* @return the new encrypted byte array.
* @throws SaslException if error happens
*/
public byte[] wrap(byte[] data, int offset, int len) throws SaslException {
// mac
byte[] mac = integrity.getHMAC(data, offset, len);
integrity.incMySeqNum();
// encrypt
byte[] encrypted = new byte[len + 10];
try {
int n = encryptor.update(data, offset, len, encrypted, 0);
encryptor.update(mac, 0, 10, encrypted, n);
} catch (ShortBufferException sbe) {
// this should not happen
throw new SaslException("Error happens during encrypt data", sbe);
}
// append seqNum used for mac
byte[] wrapped = new byte[encrypted.length + 4];
System.arraycopy(encrypted, 0, wrapped, 0, encrypted.length);
System.arraycopy(integrity.getSeqNum(), 0, wrapped, encrypted.length, 4);
return wrapped;
}
/**
* Decrypts input data. The input composes of (msg, padding if needed, mac) and sequence num.
* The result is msg.
* @param data the input byte array
* @param offset the offset in input where the input starts
* @param len the input length
* @return the new decrypted byte array.
* @throws SaslException if error happens
*/
public byte[] unwrap(byte[] data, int offset, int len) throws SaslException {
// get plaintext and seqNum
byte[] decrypted = new byte[len - 4];
byte[] peerSeqNum = new byte[4];
try {
decryptor.update(data, offset, len - 4, decrypted, 0);
} catch (ShortBufferException sbe) {
// this should not happen
throw new SaslException("Error happens during decrypt data", sbe);
}
System.arraycopy(data, offset + decrypted.length, peerSeqNum, 0, 4);
// get msg and mac
byte[] msg = new byte[decrypted.length - 10];
byte[] mac = new byte[10];
System.arraycopy(decrypted, 0, msg, 0, msg.length);
System.arraycopy(decrypted, msg.length, mac, 0, 10);
// check mac integrity and msg sequence
if (!integrity.compareHMAC(mac, peerSeqNum, msg, 0, msg.length)) {
throw new SaslException("Unmatched MAC");
}
if (!integrity.comparePeerSeqNum(peerSeqNum)) {
throw new SaslException("Out of order sequencing of messages. Got: " + integrity.byteToInt
(peerSeqNum) + " Expected: " + integrity.peerSeqNum);
}
integrity.incPeerSeqNum();
return msg;
}
private void checkTransformation(String transformation) throws IOException {
if ("AES/CTR/NoPadding".equalsIgnoreCase(transformation)) {
return;
}
throw new IOException("AES cipher transformation is not supported: " + transformation);
}
/**
* Helper class for providing integrity protection.
*/
private static class Integrity {
private int mySeqNum = 0;
private int peerSeqNum = 0;
private byte[] seqNum = new byte[4];
private byte[] myKey;
private byte[] peerKey;
Integrity(byte[] outKey, byte[] inKey) throws IOException {
myKey = outKey;
peerKey = inKey;
}
byte[] getHMAC(byte[] msg, int start, int len) throws SaslException {
intToByte(mySeqNum);
return calculateHMAC(myKey, seqNum, msg, start, len);
}
boolean compareHMAC(byte[] expectedHMAC, byte[] peerSeqNum, byte[] msg, int start,
int len) throws SaslException {
byte[] mac = calculateHMAC(peerKey, peerSeqNum, msg, start, len);
return Arrays.equals(mac, expectedHMAC);
}
boolean comparePeerSeqNum(byte[] peerSeqNum) {
return this.peerSeqNum == byteToInt(peerSeqNum);
}
byte[] getSeqNum() {
return seqNum;
}
void incMySeqNum() {
mySeqNum ++;
}
void incPeerSeqNum() {
peerSeqNum ++;
}
private byte[] calculateHMAC(byte[] key, byte[] seqNum, byte[] msg, int start,
int len) throws SaslException {
byte[] seqAndMsg = new byte[4+len];
System.arraycopy(seqNum, 0, seqAndMsg, 0, 4);
System.arraycopy(msg, start, seqAndMsg, 4, len);
try {
SecretKey keyKi = new SecretKeySpec(key, "HmacMD5");
Mac m = Mac.getInstance("HmacMD5");
m.init(keyKi);
m.update(seqAndMsg);
byte[] hMAC_MD5 = m.doFinal();
/* First 10 bytes of HMAC_MD5 digest */
byte macBuffer[] = new byte[10];
System.arraycopy(hMAC_MD5, 0, macBuffer, 0, 10);
return macBuffer;
} catch (InvalidKeyException e) {
throw new SaslException("Invalid bytes used for key of HMAC-MD5 hash.", e);
} catch (NoSuchAlgorithmException e) {
throw new SaslException("Error creating instance of MD5 MAC algorithm", e);
}
}
private void intToByte(int num) {
for(int i = 3; i >= 0; i --) {
seqNum[i] = (byte)(num & 0xff);
num >>>= 8;
}
}
private int byteToInt(byte[] seqNum) {
int answer = 0;
for (int i = 0; i < 4; i ++) {
answer <<= 8;
answer |= ((int)seqNum[i] & 0xff);
}
return answer;
}
}
}

View File

@ -89,6 +89,14 @@ message ConnectionHeader {
// Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
optional string cell_block_compressor_class = 4;
optional VersionInfo version_info = 5;
// the transformation for rpc AES encryption with Apache Commons Crypto
optional string rpc_crypto_cipher_transformation = 6;
}
// This is sent by rpc server to negotiate the data if necessary
message ConnectionHeaderResponse {
// To use Apache Commons Crypto, negotiate the metadata
optional CryptoCipherMeta crypto_cipher_meta = 1;
}
// Optional Cell block Message. Included in client RequestHeader
@ -112,6 +120,17 @@ message ExceptionResponse {
optional bool do_not_retry = 5;
}
/**
* Cipher meta for Crypto
*/
message CryptoCipherMeta {
required string transformation = 1;
optional bytes inKey = 2;
optional bytes inIv = 3;
optional bytes outKey = 4;
optional bytes outIv = 5;
}
// Header sent making a request.
message RequestHeader {
// Monotonically increasing call_id to keep track of RPC requests and their response

View File

@ -45,6 +45,7 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -53,6 +54,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@ -70,6 +72,9 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
import org.apache.commons.crypto.random.CryptoRandom;
import org.apache.commons.crypto.random.CryptoRandomFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -91,10 +96,12 @@ import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
@ -134,6 +141,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.TraceInfo;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
@ -423,6 +431,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.response = new BufferChain(responseBufs);
}
protected synchronized void setConnectionHeaderResponse(ByteBuffer response) {
ByteBuffer[] responseBufs = new ByteBuffer[1];
responseBufs[0] = response;
this.response = new BufferChain(responseBufs);
}
protected synchronized void setResponse(Object m, final CellScanner cells,
Throwable t, String errorMsg) {
if (this.isError) return;
@ -565,9 +579,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
byte [] responseBytes = bc.getBytes();
byte [] token;
// synchronization may be needed since there can be multiple Handler
// threads using saslServer to wrap responses.
synchronized (connection.saslServer) {
token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
// threads using saslServer or Crypto AES to wrap responses.
if (connection.useCryptoAesWrap) {
// wrap with Crypto AES
synchronized (connection.cryptoAES) {
token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length);
}
} else {
synchronized (connection.saslServer) {
token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Adding saslServer wrapped token of size " + token.length
@ -1255,7 +1276,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
boolean useSasl;
SaslServer saslServer;
private CryptoAES cryptoAES;
private boolean useWrap = false;
private boolean useCryptoAesWrap = false;
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
@ -1266,6 +1289,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private static final int SASL_CALLID = -33;
private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
0, null, null, 0);
// Fake 'call' for connection header response
private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID,
null, null, null, null, null, this, null, 0, null, null, 0);
// was authentication allowed with a fallback to simple auth
private boolean authenticatedWithFallback;
@ -1376,7 +1403,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
processOneRpc(saslToken);
} else {
byte[] b = saslToken.array();
byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
byte [] plaintextData;
if (useCryptoAesWrap) {
// unwrap with CryptoAES
plaintextData = cryptoAES.unwrap(b, saslToken.position(), saslToken.limit());
} else {
plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
}
processUnwrappedData(plaintextData);
}
} else {
@ -1503,6 +1536,31 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
/**
* Send the response for connection header
*/
private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData) throws IOException {
ByteBufferOutputStream response = null;
DataOutputStream out = null;
try {
response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
out = new DataOutputStream(response);
out.writeInt(wrappedCipherMetaData.length);
out.write(wrappedCipherMetaData);
setConnectionHeaderResponseCall.setConnectionHeaderResponse(response.getByteBuffer());
setConnectionHeaderResponseCall.responder = responder;
setConnectionHeaderResponseCall.sendResponseIfReady();
} finally {
if (out != null) {
out.close();
}
if (response != null) {
response.close();
}
}
}
private void disposeSasl() {
if (saslServer != null) {
try {
@ -1744,6 +1802,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.service = getService(services, serviceName);
if (this.service == null) throw new UnknownServiceException(serviceName);
setupCellBlockCodecs(this.connectionHeader);
RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
RPCProtos.ConnectionHeaderResponse.newBuilder();
setupCryptoCipher(this.connectionHeader, chrBuilder);
responseConnectionHeader(chrBuilder);
UserGroupInformation protocolUser = createUser(connectionHeader);
if (!useSasl) {
ugi = protocolUser;
@ -1792,8 +1854,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ " with unknown version info");
}
}
/**
@ -1820,6 +1880,92 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
/**
* Set up cipher for rpc encryption with Apache Commons Crypto
* @throws FatalConnectionException
*/
private void setupCryptoCipher(final ConnectionHeader header,
RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) throws FatalConnectionException {
// If simple auth, return
if (saslServer == null) return;
// check if rpc encryption with Crypto AES
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
.getSaslQop().equalsIgnoreCase(qop);
boolean isCryptoAesEncryption = isEncryption && conf.getBoolean(
"hbase.rpc.crypto.encryption.aes.enabled", false);
if (!isCryptoAesEncryption) return;
if (!header.hasRpcCryptoCipherTransformation()) return;
String transformation = header.getRpcCryptoCipherTransformation();
if (transformation == null || transformation.length() == 0) return;
// Negotiates AES based on complete saslServer.
// The Crypto metadata need to be encrypted and send to client.
Properties properties = new Properties();
// the property for SecureRandomFactory
properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
"org.apache.commons.crypto.random.JavaCryptoRandom"));
// the property for cipher class
properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
"org.apache.commons.crypto.cipher.JceCipher"));
int cipherKeyBits = conf.getInt(
"hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
// generate key and iv
if (cipherKeyBits % 8 != 0) {
throw new IllegalArgumentException("The AES cipher key size in bits" +
" should be a multiple of byte");
}
int len = cipherKeyBits / 8;
byte[] inKey = new byte[len];
byte[] outKey = new byte[len];
byte[] inIv = new byte[len];
byte[] outIv = new byte[len];
try {
// generate the cipher meta data with SecureRandom
CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
secureRandom.nextBytes(inKey);
secureRandom.nextBytes(outKey);
secureRandom.nextBytes(inIv);
secureRandom.nextBytes(outIv);
// create CryptoAES for server
cryptoAES = new CryptoAES(transformation, properties,
inKey, outKey, inIv, outIv);
// create SaslCipherMeta and send to client,
// for client, the [inKey, outKey], [inIv, outIv] should be reversed
RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
ccmBuilder.setTransformation(transformation);
ccmBuilder.setInIv(getByteString(outIv));
ccmBuilder.setInKey(getByteString(outKey));
ccmBuilder.setOutIv(getByteString(inIv));
ccmBuilder.setOutKey(getByteString(inKey));
chrBuilder.setCryptoCipherMeta(ccmBuilder);
useCryptoAesWrap = true;
} catch (GeneralSecurityException | IOException ex) {
throw new UnsupportedCryptoException(ex.getMessage(), ex);
}
}
private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
throws FatalConnectionException {
// Response the connection header if Crypto AES is enabled
if (!chrBuilder.hasCryptoCipherMeta()) return;
try {
byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
// encrypt the Crypto AES cipher meta data with sasl server, and send to client
byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
} catch (IOException ex) {
throw new UnsupportedCryptoException(ex.getMessage(), ex);
}
}
private void processUnwrappedData(byte[] inBuf) throws IOException,
InterruptedException {
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
@ -1858,7 +2004,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
processRequest(buf);
@ -1988,6 +2133,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
private ByteString getByteString(byte[] bytes) {
// return singleton to reduce object allocation
return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
}
private boolean authorizeConnection() throws IOException {
try {
// If auth method is DIGEST, the token was obtained by the

View File

@ -26,6 +26,7 @@ import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConf
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@ -96,8 +97,8 @@ public class TestSecureIPC {
@Parameters(name = "{index}: rpcClientImpl={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() },
new Object[] { NettyRpcClient.class.getName() });
return Arrays.asList(new Object[]{BlockingRpcClient.class.getName()},
new Object[]{NettyRpcClient.class.getName()});
}
@Parameter
@ -192,6 +193,42 @@ public class TestSecureIPC {
callRpcService(User.create(ugi));
}
/**
* Test sasl encryption with Crypto AES.
* @throws Exception
*/
@Test
public void testSaslWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("true", "true");
callRpcService(User.create(ugi));
}
/**
* Test various combinations of Server and Client configuration for Crypto AES.
* @throws Exception
*/
@Test
public void testDifferentConfWithCryptoAES() throws Exception {
setRpcProtection("privacy", "privacy");
setCryptoAES("false", "true");
callRpcService(User.create(ugi));
setCryptoAES("true", "false");
try {
callRpcService(User.create(ugi));
fail("The exception should be thrown out for the rpc timeout.");
} catch (Exception e) {
// ignore the expected exception
}
}
void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
}
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
throws Exception {
Configuration cnf = new Configuration();

12
pom.xml
View File

@ -1228,6 +1228,7 @@
<spy.version>2.11.6</spy.version>
<bouncycastle.version>1.46</bouncycastle.version>
<kerby.version>1.0.0-RC2</kerby.version>
<commons-crypto.version>1.0.0</commons-crypto.version>
<!-- Plugin Dependencies -->
<maven.assembly.version>2.4</maven.assembly.version>
<maven.antrun.version>1.8</maven.antrun.version>
@ -1806,6 +1807,17 @@
<artifactId>kerb-simplekdc</artifactId>
<version>${kerby.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
<version>${commons-crypto.version}</version>
<exclusions>
<exclusion>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Dependencies needed by subprojects -->