diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index ef6c1ca41d2..a56c3d75e91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -367,16 +367,20 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private final Promise promise; + private final DFSClient dfsClient; + private int step = 0; public SaslNegotiateHandler(Configuration conf, String username, char[] password, - Map saslProps, int timeoutMs, Promise promise) throws SaslException { + Map saslProps, int timeoutMs, Promise promise, + DFSClient dfsClient) throws SaslException { this.conf = conf; this.saslProps = saslProps; this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); this.timeoutMs = timeoutMs; this.promise = promise; + this.dfsClient = dfsClient; } private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { @@ -434,6 +438,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private void check(DataTransferEncryptorMessageProto proto) throws IOException { if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + dfsClient.clearDataEncryptionKey(); throw new InvalidEncryptionKeyException(proto.getMessage()); } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { throw new IOException(proto.getMessage()); @@ -737,12 +742,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, - String username, char[] password, Map saslProps, Promise saslPromise) { + String username, char[] password, Map saslProps, Promise saslPromise, + DFSClient dfsClient) { try { channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), new ProtobufVarint32FrameDecoder(), new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), - new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise)); + new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise, + dfsClient)); } catch (SaslException e) { saslPromise.tryFailure(e); } @@ -769,7 +776,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), encryptionKeyToPassword(encryptionKey.encryptionKey), - createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise); + createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise, + client); } else if (!UserGroupInformation.isSecurityEnabled()) { if (LOG.isDebugEnabled()) { LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr @@ -794,7 +802,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); } doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), - buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); + buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise, + client); } else { // It's a secured cluster using non-privileged ports, but no SASL. The only way this can // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare