Revert "HBASE-20993. [Auth] IPC client fallback to simple auth allowed doesn't work"
Revert reason: It breaks phoenix client.
This reverts commit 8dbb0b0487
.
This commit is contained in:
parent
5f8de7314c
commit
c6a1334528
|
@ -66,8 +66,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
|
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
|
||||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
|
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
|
||||||
import org.apache.hadoop.hbase.security.SaslStatus;
|
|
||||||
import org.apache.hadoop.hbase.security.SaslUtil;
|
|
||||||
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
|
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
||||||
|
@ -442,7 +440,6 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||||
OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
|
OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
|
||||||
// Write out the preamble -- MAGIC, version, and auth to use.
|
// Write out the preamble -- MAGIC, version, and auth to use.
|
||||||
writeConnectionHeaderPreamble(outStream);
|
writeConnectionHeaderPreamble(outStream);
|
||||||
readPreambleResponse(inStream);
|
|
||||||
if (useSasl) {
|
if (useSasl) {
|
||||||
final InputStream in2 = inStream;
|
final InputStream in2 = inStream;
|
||||||
final OutputStream out2 = outStream;
|
final OutputStream out2 = outStream;
|
||||||
|
@ -502,25 +499,6 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||||
thread.start();
|
thread.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readPreambleResponse(InputStream inStream) throws IOException {
|
|
||||||
DataInputStream resultCode = new DataInputStream(new BufferedInputStream(inStream));
|
|
||||||
int state = resultCode.readInt();
|
|
||||||
if (state == SaslStatus.SUCCESS.state) {
|
|
||||||
int fallback = resultCode.readInt();
|
|
||||||
if (fallback == SaslStatus.SUCCESS.state) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (fallback == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
|
|
||||||
if (this.rpcClient.fallbackAllowed) {
|
|
||||||
useSasl = false;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
throw new FallbackDisallowedException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
readResponse();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
|
* Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -32,7 +32,7 @@ public class FallbackDisallowedException extends HBaseIOException {
|
||||||
private static final long serialVersionUID = -6942845066279358253L;
|
private static final long serialVersionUID = -6942845066279358253L;
|
||||||
|
|
||||||
public FallbackDisallowedException() {
|
public FallbackDisallowedException() {
|
||||||
super("Server asked us to fall back to SIMPLE auth, "
|
super("Server asks us to fall back to SIMPLE auth, "
|
||||||
+ "but this client is configured to only allow secure connections.");
|
+ "but this client is configured to only allow secure connections.");
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -181,33 +181,6 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void negotiate(final Channel ch) {
|
|
||||||
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
|
|
||||||
|
|
||||||
Promise<Boolean> preamblePromise = ch.eventLoop().newPromise();
|
|
||||||
ChannelHandler preambleHandler = new NettyRpcNegotiateHandler(this, preamblePromise,
|
|
||||||
rpcClient.fallbackAllowed);
|
|
||||||
ch.pipeline().addFirst(preambleHandler);
|
|
||||||
preamblePromise.addListener(new FutureListener<Boolean>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void operationComplete(Future<Boolean> future) throws Exception {
|
|
||||||
if (future.isSuccess()) {
|
|
||||||
ch.pipeline().remove(NettyRpcNegotiateHandler.class);
|
|
||||||
if (useSasl) {
|
|
||||||
saslNegotiate(ch);
|
|
||||||
} else {
|
|
||||||
established(ch);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
final Throwable error = future.cause();
|
|
||||||
scheduleRelogin(error);
|
|
||||||
failInit(ch, toIOE(error));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void saslNegotiate(final Channel ch) {
|
private void saslNegotiate(final Channel ch) {
|
||||||
UserGroupInformation ticket = getUGI();
|
UserGroupInformation ticket = getUGI();
|
||||||
if (ticket == null) {
|
if (ticket == null) {
|
||||||
|
@ -263,7 +236,12 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
|
rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
negotiate(ch);
|
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
|
||||||
|
if (useSasl) {
|
||||||
|
saslNegotiate(ch);
|
||||||
|
} else {
|
||||||
|
established(ch);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}).channel();
|
}).channel();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,93 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.channel.ChannelDuplexHandler;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.util.concurrent.Promise;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.security.SaslUtil;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Implements a Netty RPC client handler for the preamble response from the RPC server
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class NettyRpcNegotiateHandler extends ChannelDuplexHandler {
|
|
||||||
private static final Log LOG = LogFactory.getLog(NettyRpcNegotiateHandler.class);
|
|
||||||
private final RpcConnection conn;
|
|
||||||
private final Promise<Boolean> promise;
|
|
||||||
private final boolean fallbackAllowed;
|
|
||||||
|
|
||||||
public NettyRpcNegotiateHandler(RpcConnection conn, Promise<Boolean> promise,
|
|
||||||
boolean fallbackAllowed) {
|
|
||||||
this.conn = conn;
|
|
||||||
this.promise = promise;
|
|
||||||
this.fallbackAllowed = fallbackAllowed;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void attemptToFallback() {
|
|
||||||
if (fallbackAllowed) {
|
|
||||||
LOG.info("Server asked us to fall back to SIMPLE auth. Falling back...");
|
|
||||||
conn.useSasl = false;
|
|
||||||
} else {
|
|
||||||
LOG.error("Server asked us to fall back to SIMPLE auth, " +
|
|
||||||
"but we are not configured for that behavior!");
|
|
||||||
handleFailure(new FallbackDisallowedException());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleFailure(Exception e) {
|
|
||||||
promise.tryFailure(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
ByteBuf buf = (ByteBuf) msg;
|
|
||||||
int status;
|
|
||||||
try {
|
|
||||||
status = buf.readInt();
|
|
||||||
if (status == 0) {
|
|
||||||
if (buf.readInt() == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
|
|
||||||
attemptToFallback();
|
|
||||||
}
|
|
||||||
promise.trySuccess(true);
|
|
||||||
}
|
|
||||||
handleFailure(new IOException("Error while establishing connection to server"));
|
|
||||||
} catch (Exception e) {
|
|
||||||
handleFailure(e);
|
|
||||||
} finally {
|
|
||||||
buf.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) {
|
|
||||||
ctx.flush();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|
||||||
conn.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -58,7 +58,7 @@ abstract class RpcConnection {
|
||||||
|
|
||||||
protected final AuthMethod authMethod;
|
protected final AuthMethod authMethod;
|
||||||
|
|
||||||
protected boolean useSasl;
|
protected final boolean useSasl;
|
||||||
|
|
||||||
protected final Token<? extends TokenIdentifier> token;
|
protected final Token<? extends TokenIdentifier> token;
|
||||||
|
|
||||||
|
|
|
@ -1301,6 +1301,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
|
|
||||||
private AuthMethod authMethod;
|
private AuthMethod authMethod;
|
||||||
private boolean saslContextEstablished;
|
private boolean saslContextEstablished;
|
||||||
|
private boolean skipInitialSaslHandshake;
|
||||||
private ByteBuffer unwrappedData;
|
private ByteBuffer unwrappedData;
|
||||||
// When is this set? FindBugs wants to know! Says NP
|
// When is this set? FindBugs wants to know! Says NP
|
||||||
private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
|
private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
|
||||||
|
@ -1577,7 +1578,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
preambleBuffer.flip();
|
preambleBuffer.flip();
|
||||||
for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
|
for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
|
||||||
if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
|
if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
|
||||||
doRawSaslReply(SaslStatus.ERROR, null, null, null);
|
|
||||||
return doBadPreambleHandling("Expected HEADER=" +
|
return doBadPreambleHandling("Expected HEADER=" +
|
||||||
Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
|
Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
|
||||||
Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
|
Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
|
||||||
|
@ -1588,23 +1588,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
|
byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
|
||||||
this.authMethod = AuthMethod.valueOf(authbyte);
|
this.authMethod = AuthMethod.valueOf(authbyte);
|
||||||
if (version != CURRENT_VERSION) {
|
if (version != CURRENT_VERSION) {
|
||||||
doRawSaslReply(SaslStatus.ERROR, null, null, null);
|
|
||||||
String msg = getFatalConnectionString(version, authbyte);
|
String msg = getFatalConnectionString(version, authbyte);
|
||||||
return doBadPreambleHandling(msg, new WrongVersionException(msg));
|
return doBadPreambleHandling(msg, new WrongVersionException(msg));
|
||||||
}
|
}
|
||||||
if (authMethod == null) {
|
if (authMethod == null) {
|
||||||
doRawSaslReply(SaslStatus.ERROR, null, null, null);
|
|
||||||
String msg = getFatalConnectionString(version, authbyte);
|
String msg = getFatalConnectionString(version, authbyte);
|
||||||
return doBadPreambleHandling(msg, new BadAuthException(msg));
|
return doBadPreambleHandling(msg, new BadAuthException(msg));
|
||||||
}
|
}
|
||||||
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
|
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
|
||||||
// server side uses non-simple auth, client side uses simple auth.
|
|
||||||
if (allowFallbackToSimpleAuth) {
|
if (allowFallbackToSimpleAuth) {
|
||||||
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null);
|
|
||||||
metrics.authenticationFallback();
|
metrics.authenticationFallback();
|
||||||
authenticatedWithFallback = true;
|
authenticatedWithFallback = true;
|
||||||
} else {
|
} else {
|
||||||
doRawSaslReply(SaslStatus.ERROR, null, null, null);
|
|
||||||
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
|
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
|
||||||
setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
|
setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
|
||||||
responder.doRespond(authFailedCall);
|
responder.doRespond(authFailedCall);
|
||||||
|
@ -1612,17 +1607,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
|
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
|
||||||
// server side uses simple auth, client side uses non-simple auth.
|
|
||||||
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
|
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
|
||||||
SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
|
SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
|
||||||
authMethod = AuthMethod.SIMPLE;
|
authMethod = AuthMethod.SIMPLE;
|
||||||
} else if (authMethod != AuthMethod.SIMPLE) {
|
// client has already sent the initial Sasl message and we
|
||||||
// both server and client side use non-simple auth.
|
// should ignore it. Both client and server should fall back
|
||||||
|
// to simple auth from now on.
|
||||||
|
skipInitialSaslHandshake = true;
|
||||||
|
}
|
||||||
|
if (authMethod != AuthMethod.SIMPLE) {
|
||||||
useSasl = true;
|
useSasl = true;
|
||||||
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null);
|
|
||||||
} else if (!isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
|
|
||||||
// both server and client side use simple auth.
|
|
||||||
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
preambleBuffer = null; // do not need it anymore
|
preambleBuffer = null; // do not need it anymore
|
||||||
|
@ -1774,6 +1768,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
private void process() throws IOException, InterruptedException {
|
private void process() throws IOException, InterruptedException {
|
||||||
data.flip();
|
data.flip();
|
||||||
try {
|
try {
|
||||||
|
if (skipInitialSaslHandshake) {
|
||||||
|
skipInitialSaslHandshake = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (useSasl) {
|
if (useSasl) {
|
||||||
saslReadAndProcess(data);
|
saslReadAndProcess(data);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -89,9 +89,6 @@ public class TestRpcServerSlowConnectionSetup {
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
socket.getOutputStream().write(preamble, rpcHeaderLen + 1, 1);
|
socket.getOutputStream().write(preamble, rpcHeaderLen + 1, 1);
|
||||||
socket.getOutputStream().flush();
|
socket.getOutputStream().flush();
|
||||||
DataInputStream pis = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
|
|
||||||
int responseCode = pis.readInt();
|
|
||||||
assertEquals(responseCode, 0);
|
|
||||||
|
|
||||||
ConnectionHeader header = ConnectionHeader.newBuilder()
|
ConnectionHeader header = ConnectionHeader.newBuilder()
|
||||||
.setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getFullName())
|
.setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getFullName())
|
||||||
|
|
|
@ -1,221 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.security;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
|
||||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
|
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
|
|
||||||
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotSame;
|
|
||||||
import static org.junit.Assert.assertSame;
|
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.protobuf.BlockingService;
|
|
||||||
import com.google.protobuf.ServiceException;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
|
|
||||||
import org.apache.commons.lang.RandomStringUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
|
|
||||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
|
||||||
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
|
|
||||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.apache.hadoop.minikdc.MiniKdc;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.junit.rules.ExpectedException;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.Parameterized;
|
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
|
||||||
@Category({SecurityTests.class, SmallTests.class})
|
|
||||||
public class TestInsecureIPC {
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
private static final File KEYTAB_FILE =
|
|
||||||
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
|
|
||||||
|
|
||||||
private static MiniKdc KDC;
|
|
||||||
private static String HOST = "localhost";
|
|
||||||
private static String PRINCIPAL;
|
|
||||||
|
|
||||||
String krbKeytab;
|
|
||||||
String krbPrincipal;
|
|
||||||
|
|
||||||
Configuration clientConf;
|
|
||||||
Configuration serverConf;
|
|
||||||
UserGroupInformation ugi;
|
|
||||||
|
|
||||||
@Rule
|
|
||||||
public ExpectedException exception = ExpectedException.none();
|
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{index}: rpcClientImpl={0}")
|
|
||||||
public static Collection<Object[]> parameters() {
|
|
||||||
return Arrays.asList(new Object[]{BlockingRpcClient.class.getName()},
|
|
||||||
new Object[]{NettyRpcClient.class.getName()});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Parameterized.Parameter
|
|
||||||
public String rpcClientImpl;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUp() throws Exception {
|
|
||||||
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
|
|
||||||
PRINCIPAL = "hbase/" + HOST;
|
|
||||||
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
|
|
||||||
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDown() throws IOException {
|
|
||||||
if (KDC != null) {
|
|
||||||
KDC.stop();
|
|
||||||
}
|
|
||||||
TEST_UTIL.cleanupTestDir();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUpTest() throws Exception {
|
|
||||||
krbKeytab = getKeytabFileForTesting();
|
|
||||||
krbPrincipal = getPrincipalForTesting();
|
|
||||||
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
|
||||||
clientConf = getSecuredConfiguration();
|
|
||||||
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
|
|
||||||
serverConf = HBaseConfiguration.create();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRpcInsecureClientAgainstInsecureServer() throws Exception {
|
|
||||||
String clientUsername = "testuser";
|
|
||||||
UserGroupInformation clientUgi =
|
|
||||||
UserGroupInformation.createUserForTesting(clientUsername, new String[]{clientUsername});
|
|
||||||
|
|
||||||
assertNotSame(ugi, clientUgi);
|
|
||||||
assertEquals(UserGroupInformation.AuthenticationMethod.SIMPLE,
|
|
||||||
clientUgi.getAuthenticationMethod());
|
|
||||||
assertEquals(clientUsername, clientUgi.getUserName());
|
|
||||||
|
|
||||||
clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
|
|
||||||
callInsecureRpcService(User.create(clientUgi));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRpcFallbackToSimpleFromKerberosClientAgainstInsecureServer() throws Exception {
|
|
||||||
UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
|
|
||||||
|
|
||||||
assertSame(ugi, ugi2);
|
|
||||||
assertEquals(UserGroupInformation.AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
|
|
||||||
assertEquals(krbPrincipal, ugi.getUserName());
|
|
||||||
|
|
||||||
serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
|
|
||||||
callInsecureRpcService(User.create(ugi2));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void callInsecureRpcService(User clientUser) throws Exception {
|
|
||||||
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
|
|
||||||
|
|
||||||
RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestInsecureIPC",
|
|
||||||
Lists.newArrayList(
|
|
||||||
new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE,
|
|
||||||
null)),
|
|
||||||
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
|
|
||||||
rpcServer.start();
|
|
||||||
try (RpcClient rpcClient =
|
|
||||||
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
|
|
||||||
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
|
|
||||||
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
|
|
||||||
TestInsecureIPC.TestThread th1 = new TestInsecureIPC.TestThread(stub);
|
|
||||||
final Throwable[] exception = new Throwable[1];
|
|
||||||
Collections.synchronizedList(new ArrayList<Throwable>());
|
|
||||||
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
|
|
||||||
public void uncaughtException(Thread th, Throwable ex) {
|
|
||||||
exception[0] = ex;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
th1.setUncaughtExceptionHandler(exceptionHandler);
|
|
||||||
th1.start();
|
|
||||||
th1.join();
|
|
||||||
if (exception[0] != null) {
|
|
||||||
// throw root cause.
|
|
||||||
while (exception[0].getCause() != null) {
|
|
||||||
exception[0] = exception[0].getCause();
|
|
||||||
}
|
|
||||||
throw (Exception) exception[0];
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
|
|
||||||
throws Exception {
|
|
||||||
Configuration cnf = new Configuration();
|
|
||||||
cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
|
||||||
UserGroupInformation.setConfiguration(cnf);
|
|
||||||
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
|
|
||||||
return UserGroupInformation.getLoginUser();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class TestThread extends Thread {
|
|
||||||
private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
|
|
||||||
|
|
||||||
public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) {
|
|
||||||
this.stub = stub;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
int[] messageSize = new int[]{100, 1000, 10000};
|
|
||||||
for (int i = 0; i < messageSize.length; i++) {
|
|
||||||
String input = RandomStringUtils.random(messageSize[i]);
|
|
||||||
String result =
|
|
||||||
stub.echo(null,
|
|
||||||
TestProtos.EchoRequestProto.newBuilder().setMessage(input).build()).getMessage();
|
|
||||||
assertEquals(input, result);
|
|
||||||
}
|
|
||||||
} catch (ServiceException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -176,16 +176,13 @@ public class TestSecureIPC {
|
||||||
setRpcProtection("authentication", "privacy,authentication");
|
setRpcProtection("authentication", "privacy,authentication");
|
||||||
callRpcService(User.create(ugi));
|
callRpcService(User.create(ugi));
|
||||||
|
|
||||||
setRpcProtection("integrity,authentication",
|
setRpcProtection("integrity,authentication", "privacy,authentication");
|
||||||
"privacy,authentication");
|
|
||||||
callRpcService(User.create(ugi));
|
callRpcService(User.create(ugi));
|
||||||
|
|
||||||
setRpcProtection("integrity,authentication",
|
setRpcProtection("integrity,authentication", "integrity,authentication");
|
||||||
"integrity,authentication");
|
|
||||||
callRpcService(User.create(ugi));
|
callRpcService(User.create(ugi));
|
||||||
|
|
||||||
setRpcProtection("privacy,authentication",
|
setRpcProtection("privacy,authentication", "privacy,authentication");
|
||||||
"privacy,authentication");
|
|
||||||
callRpcService(User.create(ugi));
|
callRpcService(User.create(ugi));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,8 +262,8 @@ public class TestSecureIPC {
|
||||||
for (int i = 0; i < messageSize.length; i++) {
|
for (int i = 0; i < messageSize.length; i++) {
|
||||||
String input = RandomStringUtils.random(messageSize[i]);
|
String input = RandomStringUtils.random(messageSize[i]);
|
||||||
String result =
|
String result =
|
||||||
stub.echo(null,
|
stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
|
||||||
TestProtos.EchoRequestProto.newBuilder().setMessage(input).build()).getMessage();
|
.getMessage();
|
||||||
assertEquals(input, result);
|
assertEquals(input, result);
|
||||||
}
|
}
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
|
|
Loading…
Reference in New Issue