HBASE-20993. [Auth] IPC client fallback to simple auth allowed doesn't work

Amending-Author: Reid Chan <reidchan@apache.org>
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
jackbearden 2018-08-24 18:57:18 -07:00 committed by Reid Chan
parent c458757f10
commit 8dbb0b0487
9 changed files with 390 additions and 25 deletions

View File

@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
@ -440,6 +442,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
// Write out the preamble -- MAGIC, version, and auth to use.
writeConnectionHeaderPreamble(outStream);
readPreambleResponse(inStream);
if (useSasl) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
@ -499,6 +502,25 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
thread.start();
}
private void readPreambleResponse(InputStream inStream) throws IOException {
DataInputStream resultCode = new DataInputStream(new BufferedInputStream(inStream));
int state = resultCode.readInt();
if (state == SaslStatus.SUCCESS.state) {
int fallback = resultCode.readInt();
if (fallback == SaslStatus.SUCCESS.state) {
return;
}
if (fallback == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
if (this.rpcClient.fallbackAllowed) {
useSasl = false;
return;
}
throw new FallbackDisallowedException();
}
}
readResponse();
}
/**
* Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
*/

View File

@ -32,7 +32,7 @@ public class FallbackDisallowedException extends HBaseIOException {
private static final long serialVersionUID = -6942845066279358253L;
public FallbackDisallowedException() {
super("Server asks us to fall back to SIMPLE auth, "
super("Server asked us to fall back to SIMPLE auth, "
+ "but this client is configured to only allow secure connections.");
}
}

View File

@ -181,6 +181,33 @@ class NettyRpcConnection extends RpcConnection {
}
}
private void negotiate(final Channel ch) {
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
Promise<Boolean> preamblePromise = ch.eventLoop().newPromise();
ChannelHandler preambleHandler = new NettyRpcNegotiateHandler(this, preamblePromise,
rpcClient.fallbackAllowed);
ch.pipeline().addFirst(preambleHandler);
preamblePromise.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
ch.pipeline().remove(NettyRpcNegotiateHandler.class);
if (useSasl) {
saslNegotiate(ch);
} else {
established(ch);
}
} else {
final Throwable error = future.cause();
scheduleRelogin(error);
failInit(ch, toIOE(error));
}
}
});
}
private void saslNegotiate(final Channel ch) {
UserGroupInformation ticket = getUGI();
if (ticket == null) {
@ -236,12 +263,7 @@ class NettyRpcConnection extends RpcConnection {
rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
return;
}
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
if (useSasl) {
saslNegotiate(ch);
} else {
established(ch);
}
negotiate(ch);
}
}).channel();
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.SaslUtil;
/**
* Implements a Netty RPC client handler for the preamble response from the RPC server
*/
@InterfaceAudience.Private
public class NettyRpcNegotiateHandler extends ChannelDuplexHandler {
private static final Log LOG = LogFactory.getLog(NettyRpcNegotiateHandler.class);
private final RpcConnection conn;
private final Promise<Boolean> promise;
private final boolean fallbackAllowed;
public NettyRpcNegotiateHandler(RpcConnection conn, Promise<Boolean> promise,
boolean fallbackAllowed) {
this.conn = conn;
this.promise = promise;
this.fallbackAllowed = fallbackAllowed;
}
private void attemptToFallback() {
if (fallbackAllowed) {
LOG.info("Server asked us to fall back to SIMPLE auth. Falling back...");
conn.useSasl = false;
} else {
LOG.error("Server asked us to fall back to SIMPLE auth, " +
"but we are not configured for that behavior!");
handleFailure(new FallbackDisallowedException());
}
}
private void handleFailure(Exception e) {
promise.tryFailure(e);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
int status;
try {
status = buf.readInt();
if (status == 0) {
if (buf.readInt() == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
attemptToFallback();
}
promise.trySuccess(true);
}
handleFailure(new IOException("Error while establishing connection to server"));
} catch (Exception e) {
handleFailure(e);
} finally {
buf.release();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
conn.shutdown();
}
}

View File

@ -58,7 +58,7 @@ abstract class RpcConnection {
protected final AuthMethod authMethod;
protected final boolean useSasl;
protected boolean useSasl;
protected final Token<? extends TokenIdentifier> token;

View File

@ -1301,7 +1301,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private AuthMethod authMethod;
private boolean saslContextEstablished;
private boolean skipInitialSaslHandshake;
private ByteBuffer unwrappedData;
// When is this set? FindBugs wants to know! Says NP
private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
@ -1578,6 +1577,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
preambleBuffer.flip();
for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
doRawSaslReply(SaslStatus.ERROR, null, null, null);
return doBadPreambleHandling("Expected HEADER=" +
Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
@ -1588,18 +1588,23 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
this.authMethod = AuthMethod.valueOf(authbyte);
if (version != CURRENT_VERSION) {
doRawSaslReply(SaslStatus.ERROR, null, null, null);
String msg = getFatalConnectionString(version, authbyte);
return doBadPreambleHandling(msg, new WrongVersionException(msg));
}
if (authMethod == null) {
doRawSaslReply(SaslStatus.ERROR, null, null, null);
String msg = getFatalConnectionString(version, authbyte);
return doBadPreambleHandling(msg, new BadAuthException(msg));
}
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
// server side uses non-simple auth, client side uses simple auth.
if (allowFallbackToSimpleAuth) {
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null);
metrics.authenticationFallback();
authenticatedWithFallback = true;
} else {
doRawSaslReply(SaslStatus.ERROR, null, null, null);
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
responder.doRespond(authFailedCall);
@ -1607,16 +1612,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
// server side uses simple auth, client side uses non-simple auth.
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
} else if (authMethod != AuthMethod.SIMPLE) {
// both server and client side use non-simple auth.
useSasl = true;
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null);
} else if (!isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
// both server and client side use simple auth.
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null);
}
preambleBuffer = null; // do not need it anymore
@ -1768,11 +1774,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private void process() throws IOException, InterruptedException {
data.flip();
try {
if (skipInitialSaslHandshake) {
skipInitialSaslHandshake = false;
return;
}
if (useSasl) {
saslReadAndProcess(data);
} else {

View File

@ -89,6 +89,9 @@ public class TestRpcServerSlowConnectionSetup {
Thread.sleep(5000);
socket.getOutputStream().write(preamble, rpcHeaderLen + 1, 1);
socket.getOutputStream().flush();
DataInputStream pis = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
int responseCode = pis.readInt();
assertEquals(responseCode, 0);
ConnectionHeader header = ConnectionHeader.newBuilder()
.setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getFullName())

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({SecurityTests.class, SmallTests.class})
public class TestInsecureIPC {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final File KEYTAB_FILE =
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
private static String HOST = "localhost";
private static String PRINCIPAL;
String krbKeytab;
String krbPrincipal;
Configuration clientConf;
Configuration serverConf;
UserGroupInformation ugi;
@Rule
public ExpectedException exception = ExpectedException.none();
@Parameterized.Parameters(name = "{index}: rpcClientImpl={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[]{BlockingRpcClient.class.getName()},
new Object[]{NettyRpcClient.class.getName()});
}
@Parameterized.Parameter
public String rpcClientImpl;
@BeforeClass
public static void setUp() throws Exception {
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
PRINCIPAL = "hbase/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
}
@AfterClass
public static void tearDown() throws IOException {
if (KDC != null) {
KDC.stop();
}
TEST_UTIL.cleanupTestDir();
}
@Before
public void setUpTest() throws Exception {
krbKeytab = getKeytabFileForTesting();
krbPrincipal = getPrincipalForTesting();
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
clientConf = getSecuredConfiguration();
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
serverConf = HBaseConfiguration.create();
}
@Test
public void testRpcInsecureClientAgainstInsecureServer() throws Exception {
String clientUsername = "testuser";
UserGroupInformation clientUgi =
UserGroupInformation.createUserForTesting(clientUsername, new String[]{clientUsername});
assertNotSame(ugi, clientUgi);
assertEquals(UserGroupInformation.AuthenticationMethod.SIMPLE,
clientUgi.getAuthenticationMethod());
assertEquals(clientUsername, clientUgi.getUserName());
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
callInsecureRpcService(User.create(clientUgi));
}
@Test
public void testRpcFallbackToSimpleFromKerberosClientAgainstInsecureServer() throws Exception {
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
assertSame(ugi, ugi2);
assertEquals(UserGroupInformation.AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
assertEquals(krbPrincipal, ugi.getUserName());
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
callInsecureRpcService(User.create(ugi2));
}
private void callInsecureRpcService(User clientUser) throws Exception {
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestInsecureIPC",
Lists.newArrayList(
new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE,
null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
rpcServer.start();
try (RpcClient rpcClient =
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
TestInsecureIPC.TestThread th1 = new TestInsecureIPC.TestThread(stub);
final Throwable[] exception = new Throwable[1];
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
exception[0] = ex;
}
};
th1.setUncaughtExceptionHandler(exceptionHandler);
th1.start();
th1.join();
if (exception[0] != null) {
// throw root cause.
while (exception[0].getCause() != null) {
exception[0] = exception[0].getCause();
}
throw (Exception) exception[0];
}
} finally {
rpcServer.stop();
}
}
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
throws Exception {
Configuration cnf = new Configuration();
cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(cnf);
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
return UserGroupInformation.getLoginUser();
}
public static class TestThread extends Thread {
private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) {
this.stub = stub;
}
@Override
public void run() {
try {
int[] messageSize = new int[]{100, 1000, 10000};
for (int i = 0; i < messageSize.length; i++) {
String input = RandomStringUtils.random(messageSize[i]);
String result =
stub.echo(null,
TestProtos.EchoRequestProto.newBuilder().setMessage(input).build()).getMessage();
assertEquals(input, result);
}
} catch (ServiceException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -176,13 +176,16 @@ public class TestSecureIPC {
setRpcProtection("authentication", "privacy,authentication");
callRpcService(User.create(ugi));
setRpcProtection("integrity,authentication", "privacy,authentication");
setRpcProtection("integrity,authentication",
"privacy,authentication");
callRpcService(User.create(ugi));
setRpcProtection("integrity,authentication", "integrity,authentication");
setRpcProtection("integrity,authentication",
"integrity,authentication");
callRpcService(User.create(ugi));
setRpcProtection("privacy,authentication", "privacy,authentication");
setRpcProtection("privacy,authentication",
"privacy,authentication");
callRpcService(User.create(ugi));
}
@ -262,8 +265,8 @@ public class TestSecureIPC {
for (int i = 0; i < messageSize.length; i++) {
String input = RandomStringUtils.random(messageSize[i]);
String result =
stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
.getMessage();
stub.echo(null,
TestProtos.EchoRequestProto.newBuilder().setMessage(input).build()).getMessage();
assertEquals(input, result);
}
} catch (ServiceException e) {