HBASE-21018 RS crashed because AsyncFS was unable to update HDFS data encryption key
This commit is contained in:
parent
ee164fcbc5
commit
5e12d6a98e
|
@ -367,16 +367,20 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
||||||
|
|
||||||
private final Promise<Void> promise;
|
private final Promise<Void> promise;
|
||||||
|
|
||||||
|
private final DFSClient dfsClient;
|
||||||
|
|
||||||
private int step = 0;
|
private int step = 0;
|
||||||
|
|
||||||
public SaslNegotiateHandler(Configuration conf, String username, char[] password,
|
public SaslNegotiateHandler(Configuration conf, String username, char[] password,
|
||||||
Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) throws SaslException {
|
Map<String, String> saslProps, int timeoutMs, Promise<Void> promise,
|
||||||
|
DFSClient dfsClient) throws SaslException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.saslProps = saslProps;
|
this.saslProps = saslProps;
|
||||||
this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
|
this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
|
||||||
SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
|
SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
|
||||||
this.timeoutMs = timeoutMs;
|
this.timeoutMs = timeoutMs;
|
||||||
this.promise = promise;
|
this.promise = promise;
|
||||||
|
this.dfsClient = dfsClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
|
private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
|
||||||
|
@ -434,6 +438,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
||||||
|
|
||||||
private void check(DataTransferEncryptorMessageProto proto) throws IOException {
|
private void check(DataTransferEncryptorMessageProto proto) throws IOException {
|
||||||
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
|
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
|
||||||
|
dfsClient.clearDataEncryptionKey();
|
||||||
throw new InvalidEncryptionKeyException(proto.getMessage());
|
throw new InvalidEncryptionKeyException(proto.getMessage());
|
||||||
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
|
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
|
||||||
throw new IOException(proto.getMessage());
|
throw new IOException(proto.getMessage());
|
||||||
|
@ -737,12 +742,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
|
private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
|
||||||
String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise) {
|
String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
|
||||||
|
DFSClient dfsClient) {
|
||||||
try {
|
try {
|
||||||
channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
|
channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
|
||||||
new ProtobufVarint32FrameDecoder(),
|
new ProtobufVarint32FrameDecoder(),
|
||||||
new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
|
new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
|
||||||
new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise));
|
new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
|
||||||
|
dfsClient));
|
||||||
} catch (SaslException e) {
|
} catch (SaslException e) {
|
||||||
saslPromise.tryFailure(e);
|
saslPromise.tryFailure(e);
|
||||||
}
|
}
|
||||||
|
@ -769,7 +776,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
||||||
}
|
}
|
||||||
doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
|
doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
|
||||||
encryptionKeyToPassword(encryptionKey.encryptionKey),
|
encryptionKeyToPassword(encryptionKey.encryptionKey),
|
||||||
createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise);
|
createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise,
|
||||||
|
client);
|
||||||
} else if (!UserGroupInformation.isSecurityEnabled()) {
|
} else if (!UserGroupInformation.isSecurityEnabled()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
|
LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
|
||||||
|
@ -794,7 +802,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
||||||
"SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
|
"SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
|
||||||
}
|
}
|
||||||
doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
|
doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
|
||||||
buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise);
|
buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
|
||||||
|
client);
|
||||||
} else {
|
} else {
|
||||||
// It's a secured cluster using non-privileged ports, but no SASL. The only way this can
|
// It's a secured cluster using non-privileged ports, but no SASL. The only way this can
|
||||||
// happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
|
// happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
|
||||||
|
|
Loading…
Reference in New Issue