HBASE-27278 Improve TestTlsIPC to reuse existing IPC test code (#4682)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
parent
39b496e191
commit
3309108ca7
|
@ -230,7 +230,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
* Encapsulate the ugly casting and RuntimeException conversion in private method.
|
||||
* @return Codec to use on this client.
|
||||
*/
|
||||
Codec getCodec() {
|
||||
protected Codec getCodec() {
|
||||
// For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
|
||||
// "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
|
||||
String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
|
||||
|
@ -251,7 +251,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
}
|
||||
|
||||
// for writing tests that want to throw exception when connecting.
|
||||
boolean isTcpNoDelay() {
|
||||
protected boolean isTcpNoDelay() {
|
||||
return tcpNoDelay;
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
|
|||
}
|
||||
|
||||
/** Used in test only. */
|
||||
NettyRpcClient(Configuration configuration) {
|
||||
public NettyRpcClient(Configuration configuration) {
|
||||
this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
|
||||
}
|
||||
|
||||
|
|
|
@ -281,7 +281,7 @@ class NettyRpcConnection extends RpcConnection {
|
|||
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
|
||||
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
|
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
|
||||
.handler(new ChannelInitializer() {
|
||||
.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) {
|
||||
|
|
|
@ -1,104 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.crypto.tls;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.security.Security;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
/**
|
||||
* Base class for parameterized unit tests that use X509TestContext for testing different X509
|
||||
* parameter combinations (CA key type, cert key type, with/without a password, with/without
|
||||
* hostname verification, etc). This base class takes care of setting up / cleaning up the test
|
||||
* environment, and caching the X509TestContext objects used by the tests.
|
||||
* <p/>
|
||||
* This file has been copied from the Apache ZooKeeper project.
|
||||
* @see <a href=
|
||||
* "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/test/java/org/apache/zookeeper/common/BaseX509ParameterizedTestCase.java">Base
|
||||
* revision</a>
|
||||
*/
|
||||
public abstract class BaseX509ParameterizedTestCase {
|
||||
protected static final String KEY_NON_EMPTY_PASSWORD = "pa$$w0rd";
|
||||
protected static final String KEY_EMPTY_PASSWORD = "";
|
||||
|
||||
/**
|
||||
* Because key generation and writing / deleting files is kind of expensive, we cache the certs
|
||||
* and on-disk files between test cases. None of the test cases modify any of this data so it's
|
||||
* safe to reuse between tests. This caching makes all test cases after the first one for a given
|
||||
* parameter combination complete almost instantly.
|
||||
*/
|
||||
protected static Map<Integer, X509TestContext> cachedTestContexts;
|
||||
protected static File tempDir;
|
||||
|
||||
protected X509TestContext x509TestContext;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBaseClass() throws Exception {
|
||||
Security.addProvider(new BouncyCastleProvider());
|
||||
cachedTestContexts = new HashMap<>();
|
||||
tempDir = Files.createTempDirectory("x509Tests").toFile();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanUpBaseClass() {
|
||||
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
|
||||
cachedTestContexts.clear();
|
||||
cachedTestContexts = null;
|
||||
try {
|
||||
FileUtils.deleteDirectory(tempDir);
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Init method. See example usage in {@link TestX509Util}.
|
||||
* @param paramIndex the index under which the X509TestContext should be cached.
|
||||
* @param contextSupplier a function that creates and returns the X509TestContext for the current
|
||||
* index if one is not already cached.
|
||||
*/
|
||||
protected void init(Integer paramIndex, Supplier<X509TestContext> contextSupplier) {
|
||||
if (cachedTestContexts.containsKey(paramIndex)) {
|
||||
x509TestContext = cachedTestContexts.get(paramIndex);
|
||||
} else {
|
||||
x509TestContext = contextSupplier.get();
|
||||
cachedTestContexts.put(paramIndex, x509TestContext);
|
||||
}
|
||||
}
|
||||
|
||||
protected void init(final X509KeyType caKeyType, final X509KeyType certKeyType,
|
||||
final String keyPassword, final Integer paramIndex) throws Exception {
|
||||
init(paramIndex, () -> {
|
||||
try {
|
||||
return X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
|
||||
.setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
|
||||
.setTrustStoreKeyType(caKeyType).build();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -17,27 +17,39 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.crypto.tls;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.Security;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.KeyManagerException;
|
||||
import org.apache.hadoop.hbase.exceptions.SSLContextException;
|
||||
import org.apache.hadoop.hbase.exceptions.TrustManagerException;
|
||||
import org.apache.hadoop.hbase.exceptions.X509Exception;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -55,12 +67,16 @@ import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
|
|||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ MiscTests.class, SmallTests.class })
|
||||
public class TestX509Util extends BaseX509ParameterizedTestCase {
|
||||
public class TestX509Util {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestX509Util.class);
|
||||
|
||||
private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
|
||||
|
||||
private static X509TestContextProvider PROVIDER;
|
||||
|
||||
@Parameterized.Parameter()
|
||||
public X509KeyType caKeyType;
|
||||
|
||||
|
@ -73,6 +89,10 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
@Parameterized.Parameter(value = 3)
|
||||
public Integer paramIndex;
|
||||
|
||||
private X509TestContext x509TestContext;
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
@Parameterized.Parameters(
|
||||
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, paramIndex={3}")
|
||||
public static Collection<Object[]> data() {
|
||||
|
@ -80,7 +100,7 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
int paramIndex = 0;
|
||||
for (X509KeyType caKeyType : X509KeyType.values()) {
|
||||
for (X509KeyType certKeyType : X509KeyType.values()) {
|
||||
for (String keyPassword : new String[] { KEY_EMPTY_PASSWORD, KEY_NON_EMPTY_PASSWORD }) {
|
||||
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
|
||||
params.add(new Object[] { caKeyType, certKeyType, keyPassword, paramIndex++ });
|
||||
}
|
||||
}
|
||||
|
@ -88,22 +108,34 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
return params;
|
||||
}
|
||||
|
||||
private Configuration hbaseConf;
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws IOException {
|
||||
Security.addProvider(new BouncyCastleProvider());
|
||||
File dir = new File(UTIL.getDataTestDir(TestX509Util.class.getSimpleName()).toString())
|
||||
.getCanonicalFile();
|
||||
FileUtils.forceMkdir(dir);
|
||||
PROVIDER = new X509TestContextProvider(UTIL.getConfiguration(), dir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword,
|
||||
Integer paramIndex) throws Exception {
|
||||
super.init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
||||
hbaseConf = x509TestContext.getHbaseConf();
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() {
|
||||
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
|
||||
UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
|
||||
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
||||
conf = new Configuration(UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
x509TestContext.clearSystemProperties();
|
||||
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_OCSP);
|
||||
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_CLR);
|
||||
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
|
||||
x509TestContext.clearConfigurations();
|
||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
|
||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
|
||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
|
||||
System.clearProperty("com.sun.net.ssl.checkRevocation");
|
||||
System.clearProperty("com.sun.security.enableCRLDP");
|
||||
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
|
||||
|
@ -112,69 +144,59 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testCreateSSLContextWithoutCustomProtocol() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
SslContext sslContext = X509Util.createSslContextForClient(hbaseConf);
|
||||
SslContext sslContext = X509Util.createSslContextForClient(conf);
|
||||
ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
|
||||
assertEquals(new String[] { X509Util.DEFAULT_PROTOCOL },
|
||||
assertArrayEquals(new String[] { X509Util.DEFAULT_PROTOCOL },
|
||||
sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateSSLContextWithCustomProtocol() throws Exception {
|
||||
final String protocol = "TLSv1.1";
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
hbaseConf.set(X509Util.TLS_CONFIG_PROTOCOL, protocol);
|
||||
conf.set(X509Util.TLS_CONFIG_PROTOCOL, protocol);
|
||||
ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
|
||||
SslContext sslContext = X509Util.createSslContextForServer(hbaseConf);
|
||||
SslContext sslContext = X509Util.createSslContextForServer(conf);
|
||||
assertEquals(Collections.singletonList(protocol),
|
||||
Arrays.asList(sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols()));
|
||||
}
|
||||
|
||||
@Test(expected = SSLContextException.class)
|
||||
public void testCreateSSLContextWithoutKeyStoreLocationServer() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
|
||||
X509Util.createSslContextForServer(hbaseConf);
|
||||
conf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
|
||||
X509Util.createSslContextForServer(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateSSLContextWithoutKeyStoreLocationClient() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
|
||||
X509Util.createSslContextForClient(hbaseConf);
|
||||
conf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
|
||||
X509Util.createSslContextForClient(conf);
|
||||
}
|
||||
|
||||
@Test(expected = X509Exception.class)
|
||||
public void testCreateSSLContextWithoutKeyStorePassword() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
if (!x509TestContext.isKeyStoreEncrypted()) {
|
||||
throw new SSLContextException("");
|
||||
}
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
|
||||
X509Util.createSslContextForServer(hbaseConf);
|
||||
assumeTrue(x509TestContext.isKeyStoreEncrypted());
|
||||
conf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
|
||||
X509Util.createSslContextForServer(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateSSLContextWithoutTrustStoreLocationClient() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
|
||||
X509Util.createSslContextForClient(hbaseConf);
|
||||
conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
|
||||
X509Util.createSslContextForClient(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateSSLContextWithoutTrustStoreLocationServer() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
|
||||
X509Util.createSslContextForServer(hbaseConf);
|
||||
conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
|
||||
X509Util.createSslContextForServer(conf);
|
||||
}
|
||||
|
||||
// It would be great to test the value of PKIXBuilderParameters#setRevocationEnabled,
|
||||
// but it does not appear to be possible
|
||||
@Test
|
||||
public void testCRLEnabled() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
hbaseConf.setBoolean(X509Util.TLS_CONFIG_CLR, true);
|
||||
X509Util.createSslContextForServer(hbaseConf);
|
||||
conf.setBoolean(X509Util.TLS_CONFIG_CLR, true);
|
||||
X509Util.createSslContextForServer(conf);
|
||||
assertTrue(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation")));
|
||||
assertTrue(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP")));
|
||||
assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable")));
|
||||
|
@ -182,8 +204,7 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testCRLDisabled() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
X509Util.createSslContextForServer(hbaseConf);
|
||||
X509Util.createSslContextForServer(conf);
|
||||
assertFalse(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation")));
|
||||
assertFalse(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP")));
|
||||
assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable")));
|
||||
|
@ -191,7 +212,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadJKSKeyStore() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
// Make sure we can instantiate a key manager from the JKS file on disk
|
||||
X509Util.createKeyManager(
|
||||
x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
|
||||
|
@ -200,10 +220,7 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadJKSKeyStoreNullPassword() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
if (!x509TestContext.getKeyStorePassword().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
assumeTrue(x509TestContext.getKeyStorePassword().isEmpty());
|
||||
// Make sure that empty password and null password are treated the same
|
||||
X509Util.createKeyManager(
|
||||
x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), null,
|
||||
|
@ -212,7 +229,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadJKSKeyStoreFileTypeDefaultToJks() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
// Make sure we can instantiate a key manager from the JKS file on disk
|
||||
X509Util.createKeyManager(
|
||||
x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
|
||||
|
@ -222,7 +238,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadJKSKeyStoreWithWrongPassword() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
assertThrows(KeyManagerException.class, () -> {
|
||||
// Attempting to load with the wrong key password should fail
|
||||
X509Util.createKeyManager(
|
||||
|
@ -233,7 +248,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadJKSTrustStore() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
// Make sure we can instantiate a trust manager from the JKS file on disk
|
||||
X509Util.createTrustManager(
|
||||
x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
|
||||
|
@ -242,7 +256,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadJKSTrustStoreNullPassword() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
if (!x509TestContext.getTrustStorePassword().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
@ -254,18 +267,15 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadJKSTrustStoreFileTypeDefaultToJks() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
// Make sure we can instantiate a trust manager from the JKS file on disk
|
||||
X509Util.createTrustManager(
|
||||
x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
|
||||
x509TestContext.getTrustStorePassword(), null, // null StoreFileType means 'autodetect from
|
||||
// file extension'
|
||||
true, true);
|
||||
// null StoreFileType means 'autodetect from file extension'
|
||||
x509TestContext.getTrustStorePassword(), null, true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadJKSTrustStoreWithWrongPassword() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
assertThrows(TrustManagerException.class, () -> {
|
||||
// Attempting to load with the wrong key password should fail
|
||||
X509Util.createTrustManager(
|
||||
|
@ -276,7 +286,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadPKCS12KeyStore() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
// Make sure we can instantiate a key manager from the PKCS12 file on disk
|
||||
X509Util.createKeyManager(
|
||||
x509TestContext.getKeyStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
|
||||
|
@ -285,7 +294,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadPKCS12KeyStoreNullPassword() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
if (!x509TestContext.getKeyStorePassword().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
@ -297,7 +305,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadPKCS12KeyStoreWithWrongPassword() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
assertThrows(KeyManagerException.class, () -> {
|
||||
// Attempting to load with the wrong key password should fail
|
||||
X509Util.createKeyManager(
|
||||
|
@ -308,7 +315,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadPKCS12TrustStore() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
// Make sure we can instantiate a trust manager from the PKCS12 file on disk
|
||||
X509Util.createTrustManager(
|
||||
x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
|
||||
|
@ -318,7 +324,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadPKCS12TrustStoreNullPassword() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
if (!x509TestContext.getTrustStorePassword().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
@ -330,7 +335,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testLoadPKCS12TrustStoreWithWrongPassword() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
assertThrows(TrustManagerException.class, () -> {
|
||||
// Attempting to load with the wrong key password should fail
|
||||
X509Util.createTrustManager(
|
||||
|
@ -341,47 +345,41 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
|
|||
|
||||
@Test
|
||||
public void testGetDefaultCipherSuitesJava8() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("1.8");
|
||||
// Java 8 default should have the CBC suites first
|
||||
assertTrue(cipherSuites[0].contains("CBC"));
|
||||
assertThat(cipherSuites[0], containsString("CBC"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDefaultCipherSuitesJava9() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("9");
|
||||
// Java 9+ default should have the GCM suites first
|
||||
assertTrue(cipherSuites[0].contains("GCM"));
|
||||
assertThat(cipherSuites[0], containsString("GCM"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDefaultCipherSuitesJava10() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("10");
|
||||
// Java 9+ default should have the GCM suites first
|
||||
assertTrue(cipherSuites[0].contains("GCM"));
|
||||
assertThat(cipherSuites[0], containsString("GCM"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDefaultCipherSuitesJava11() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("11");
|
||||
// Java 9+ default should have the GCM suites first
|
||||
assertTrue(cipherSuites[0].contains("GCM"));
|
||||
assertThat(cipherSuites[0], containsString("GCM"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDefaultCipherSuitesUnknownVersion() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("notaversion");
|
||||
// If version can't be parsed, use the more conservative Java 8 default
|
||||
assertTrue(cipherSuites[0].contains("CBC"));
|
||||
assertThat(cipherSuites[0], containsString("CBC"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDefaultCipherSuitesNullVersion() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
assertThrows(NullPointerException.class, () -> {
|
||||
X509Util.getDefaultCipherSuitesForJavaVersion(null);
|
||||
});
|
||||
|
|
|
@ -53,7 +53,7 @@ public final class X509TestContext {
|
|||
private static final String KEY_STORE_PREFIX = "hbase_test_key";
|
||||
|
||||
private final File tempDir;
|
||||
private final Configuration hbaseConf = HBaseConfiguration.create();
|
||||
private final Configuration conf;
|
||||
|
||||
private final X509Certificate trustStoreCertificate;
|
||||
private final String trustStorePassword;
|
||||
|
@ -70,6 +70,7 @@ public final class X509TestContext {
|
|||
|
||||
/**
|
||||
* Constructor is intentionally private, use the Builder class instead.
|
||||
* @param conf the configuration
|
||||
* @param tempDir the directory in which key store and trust store temp files will be
|
||||
* written.
|
||||
* @param trustStoreKeyPair the key pair for the trust store.
|
||||
|
@ -78,12 +79,13 @@ public final class X509TestContext {
|
|||
* @param keyStoreKeyPair the key pair for the key store.
|
||||
* @param keyStorePassword the password to protect the key store private key.
|
||||
*/
|
||||
private X509TestContext(File tempDir, KeyPair trustStoreKeyPair, String trustStorePassword,
|
||||
KeyPair keyStoreKeyPair, String keyStorePassword)
|
||||
private X509TestContext(Configuration conf, File tempDir, KeyPair trustStoreKeyPair,
|
||||
String trustStorePassword, KeyPair keyStoreKeyPair, String keyStorePassword)
|
||||
throws IOException, GeneralSecurityException, OperatorCreationException {
|
||||
if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) {
|
||||
throw new IllegalStateException("BC Security provider was not found");
|
||||
}
|
||||
this.conf = conf;
|
||||
this.tempDir = requireNonNull(tempDir);
|
||||
if (!tempDir.isDirectory()) {
|
||||
throw new IllegalArgumentException("Not a directory: " + tempDir);
|
||||
|
@ -204,8 +206,8 @@ public final class X509TestContext {
|
|||
return keyStorePassword.length() > 0;
|
||||
}
|
||||
|
||||
public Configuration getHbaseConf() {
|
||||
return hbaseConf;
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -301,25 +303,25 @@ public final class X509TestContext {
|
|||
* @param trustStoreFileType the store file type to use for the trust store (JKS, PEM, ...).
|
||||
* @throws IOException if there is an error creating the key store file or trust store file.
|
||||
*/
|
||||
public void setSystemProperties(KeyStoreFileType keyStoreFileType,
|
||||
public void setConfigurations(KeyStoreFileType keyStoreFileType,
|
||||
KeyStoreFileType trustStoreFileType) throws IOException {
|
||||
hbaseConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION,
|
||||
conf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION,
|
||||
this.getKeyStoreFile(keyStoreFileType).getAbsolutePath());
|
||||
hbaseConf.set(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD, this.getKeyStorePassword());
|
||||
hbaseConf.set(X509Util.TLS_CONFIG_KEYSTORE_TYPE, keyStoreFileType.getPropertyValue());
|
||||
hbaseConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION,
|
||||
conf.set(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD, this.getKeyStorePassword());
|
||||
conf.set(X509Util.TLS_CONFIG_KEYSTORE_TYPE, keyStoreFileType.getPropertyValue());
|
||||
conf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION,
|
||||
this.getTrustStoreFile(trustStoreFileType).getAbsolutePath());
|
||||
hbaseConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD, this.getTrustStorePassword());
|
||||
hbaseConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE, trustStoreFileType.getPropertyValue());
|
||||
conf.set(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD, this.getTrustStorePassword());
|
||||
conf.set(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE, trustStoreFileType.getPropertyValue());
|
||||
}
|
||||
|
||||
public void clearSystemProperties() {
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_TYPE);
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD);
|
||||
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE);
|
||||
public void clearConfigurations() {
|
||||
conf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
|
||||
conf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
|
||||
conf.unset(X509Util.TLS_CONFIG_KEYSTORE_TYPE);
|
||||
conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
|
||||
conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD);
|
||||
conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -327,6 +329,7 @@ public final class X509TestContext {
|
|||
*/
|
||||
public static class Builder {
|
||||
|
||||
private final Configuration conf;
|
||||
private File tempDir;
|
||||
private X509KeyType trustStoreKeyType;
|
||||
private String trustStorePassword;
|
||||
|
@ -334,9 +337,10 @@ public final class X509TestContext {
|
|||
private String keyStorePassword;
|
||||
|
||||
/**
|
||||
* Creates an empty builder.
|
||||
* Creates an empty builder with the given Configuration.
|
||||
*/
|
||||
public Builder() {
|
||||
public Builder(Configuration conf) {
|
||||
this.conf = conf;
|
||||
trustStoreKeyType = X509KeyType.EC;
|
||||
trustStorePassword = "";
|
||||
keyStoreKeyType = X509KeyType.EC;
|
||||
|
@ -351,8 +355,8 @@ public final class X509TestContext {
|
|||
throws IOException, GeneralSecurityException, OperatorCreationException {
|
||||
KeyPair trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType);
|
||||
KeyPair keyStoreKeyPair = X509TestHelpers.generateKeyPair(keyStoreKeyType);
|
||||
return new X509TestContext(tempDir, trustStoreKeyPair, trustStorePassword, keyStoreKeyPair,
|
||||
keyStorePassword);
|
||||
return new X509TestContext(conf, tempDir, trustStoreKeyPair, trustStorePassword,
|
||||
keyStoreKeyPair, keyStorePassword);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -416,7 +420,14 @@ public final class X509TestContext {
|
|||
* @return a new Builder.
|
||||
*/
|
||||
public static Builder newBuilder() {
|
||||
return new Builder();
|
||||
return newBuilder(HBaseConfiguration.create());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new default-constructed Builder.
|
||||
* @return a new Builder.
|
||||
*/
|
||||
public static Builder newBuilder(Configuration conf) {
|
||||
return new Builder(conf);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.crypto.tls;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Objects;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
|
||||
|
||||
/**
|
||||
* Will cache X509TestContext to speed up tests.
|
||||
*/
|
||||
public class X509TestContextProvider {
|
||||
|
||||
private static final class CacheKey {
|
||||
private final X509KeyType caKeyType;
|
||||
|
||||
private final X509KeyType certKeyType;
|
||||
|
||||
private final String keyPassword;
|
||||
|
||||
CacheKey(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword) {
|
||||
this.caKeyType = caKeyType;
|
||||
this.certKeyType = certKeyType;
|
||||
this.keyPassword = keyPassword;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(caKeyType, certKeyType, keyPassword);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (!(obj instanceof CacheKey)) {
|
||||
return false;
|
||||
}
|
||||
CacheKey other = (CacheKey) obj;
|
||||
return caKeyType == other.caKeyType && certKeyType == other.certKeyType
|
||||
&& Objects.equals(keyPassword, other.keyPassword);
|
||||
}
|
||||
}
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
private final File tempDir;
|
||||
|
||||
private final LoadingCache<CacheKey, X509TestContext> ctxs =
|
||||
CacheBuilder.newBuilder().build(new CacheLoader<CacheKey, X509TestContext>() {
|
||||
|
||||
@Override
|
||||
public X509TestContext load(CacheKey key) throws Exception {
|
||||
return X509TestContext.newBuilder(conf).setTempDir(tempDir)
|
||||
.setKeyStorePassword(key.keyPassword).setKeyStoreKeyType(key.certKeyType)
|
||||
.setTrustStorePassword(key.keyPassword).setTrustStoreKeyType(key.caKeyType).build();
|
||||
}
|
||||
});
|
||||
|
||||
public X509TestContextProvider(Configuration conf, File tempDir) {
|
||||
this.conf = conf;
|
||||
this.tempDir = tempDir;
|
||||
}
|
||||
|
||||
public X509TestContext get(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword) {
|
||||
return ctxs.getUnchecked(new CacheKey(caKeyType, certKeyType, keyPassword));
|
||||
}
|
||||
}
|
|
@ -69,7 +69,7 @@ public class IntegrationTestRpcClient {
|
|||
protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
|
||||
return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
|
||||
@Override
|
||||
Codec getCodec() {
|
||||
protected Codec getCodec() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -96,7 +96,7 @@ public class NettyRpcServer extends RpcServer {
|
|||
// Get the event loop group configuration from the server class if available.
|
||||
NettyEventLoopGroupConfig config = null;
|
||||
if (server instanceof HBaseServerBase) {
|
||||
config = ((HBaseServerBase) server).getEventLoopGroupConfig();
|
||||
config = ((HBaseServerBase<?>) server).getEventLoopGroupConfig();
|
||||
}
|
||||
if (config == null) {
|
||||
config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
|
||||
|
|
|
@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MatcherPredicate;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -103,7 +102,7 @@ public abstract class AbstractTestIPC {
|
|||
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
|
||||
}
|
||||
|
||||
protected abstract RpcServer createRpcServer(final Server server, final String name,
|
||||
protected abstract RpcServer createRpcServer(final String name,
|
||||
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
|
||||
Configuration conf, RpcScheduler scheduler) throws IOException;
|
||||
|
||||
|
@ -117,11 +116,11 @@ public abstract class AbstractTestIPC {
|
|||
*/
|
||||
@Test
|
||||
public void testNoCodec() throws IOException, ServiceException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClientNoCodec(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
|
||||
|
@ -143,18 +142,18 @@ public abstract class AbstractTestIPC {
|
|||
*/
|
||||
@Test
|
||||
public void testCompressCellBlock() throws IOException, ServiceException {
|
||||
Configuration conf = new Configuration(HBaseConfiguration.create());
|
||||
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
clientConf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
int count = 3;
|
||||
for (int i = 0; i < count; i++) {
|
||||
cells.add(CELL);
|
||||
}
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
|
||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
|
||||
|
@ -179,11 +178,11 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testRTEDuringConnectionSetup() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
stub.ping(null, EmptyRequestProto.getDefaultInstance());
|
||||
|
@ -201,12 +200,13 @@ public abstract class AbstractTestIPC {
|
|||
*/
|
||||
@Test
|
||||
public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, scheduler);
|
||||
verify(scheduler).init(any(RpcScheduler.Context.class));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
verify(scheduler).start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
|
@ -224,12 +224,12 @@ public abstract class AbstractTestIPC {
|
|||
/** Tests that the rpc scheduler is called when requests arrive. */
|
||||
@Test
|
||||
public void testRpcMaxRequestSize() throws IOException, ServiceException {
|
||||
Configuration conf = new Configuration(CONF);
|
||||
conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
clientConf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
||||
new InetSocketAddress("localhost", 0), clientConf, new FifoRpcScheduler(clientConf, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
StringBuilder message = new StringBuilder(1200);
|
||||
|
@ -258,11 +258,12 @@ public abstract class AbstractTestIPC {
|
|||
@Test
|
||||
public void testRpcServerForNotNullRemoteAddressInCallObject()
|
||||
throws IOException, ServiceException {
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
assertEquals(localAddr.getAddress().getHostAddress(),
|
||||
|
@ -274,10 +275,11 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testRemoteError() throws IOException, ServiceException {
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
stub.error(null, EmptyRequestProto.getDefaultInstance());
|
||||
|
@ -293,10 +295,11 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testTimeout() throws IOException {
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
|
||||
|
@ -323,19 +326,19 @@ public abstract class AbstractTestIPC {
|
|||
}
|
||||
}
|
||||
|
||||
protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name,
|
||||
protected abstract RpcServer createTestFailingRpcServer(final String name,
|
||||
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
|
||||
Configuration conf, RpcScheduler scheduler) throws IOException;
|
||||
|
||||
/** Tests that the connection closing is handled by the client with outstanding RPC calls */
|
||||
@Test
|
||||
public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
|
||||
Configuration conf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createTestFailingRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
|
||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
|
@ -350,11 +353,11 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testAsyncEcho() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
Interface stub = newStub(client, rpcServer.getListenerAddress());
|
||||
int num = 10;
|
||||
|
@ -368,10 +371,11 @@ public abstract class AbstractTestIPC {
|
|||
callbackList.add(done);
|
||||
}
|
||||
for (int i = 0; i < num; i++) {
|
||||
EchoResponseProto resp = callbackList.get(i).get();
|
||||
HBaseRpcController pcrc = pcrcList.get(i);
|
||||
assertEquals("hello-" + i, resp.getMessage());
|
||||
assertFalse(pcrc.failed());
|
||||
assertNull(pcrc.cellScanner());
|
||||
assertEquals("hello-" + i, callbackList.get(i).get().getMessage());
|
||||
}
|
||||
} finally {
|
||||
rpcServer.stop();
|
||||
|
@ -380,8 +384,9 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testAsyncRemoteError() throws IOException {
|
||||
AbstractRpcClient<?> client = createRpcClient(CONF);
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
AbstractRpcClient<?> client = createRpcClient(clientConf);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
try {
|
||||
|
@ -404,10 +409,11 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testAsyncTimeout() throws IOException {
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
Interface stub = newStub(client, rpcServer.getListenerAddress());
|
||||
List<HBaseRpcController> pcrcList = new ArrayList<>();
|
||||
|
@ -485,10 +491,11 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testTracingSuccessIpc() throws IOException, ServiceException {
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
|
||||
|
@ -513,10 +520,11 @@ public abstract class AbstractTestIPC {
|
|||
|
||||
@Test
|
||||
public void testTracingErrorIpc() throws IOException {
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Configuration clientConf = new Configuration(CONF);
|
||||
RpcServer rpcServer = createRpcServer("testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
// use the ISA from the running server so that we can get the port selected.
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
|
||||
public class FailingNettyRpcServer extends NettyRpcServer {
|
||||
|
||||
public FailingNettyRpcServer(Server server, String name,
|
||||
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
||||
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||
super(server, name, services, bindAddress, conf, scheduler, true);
|
||||
}
|
||||
|
||||
static final class FailingConnection extends NettyServerRpcConnection {
|
||||
private FailingConnection(FailingNettyRpcServer rpcServer, Channel channel) {
|
||||
super(rpcServer, channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
|
||||
// this will throw exception after the connection header is read, and an RPC is sent
|
||||
// from client
|
||||
throw new DoNotRetryIOException("Failing for test");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
|
||||
return new NettyRpcServerPreambleHandler(FailingNettyRpcServer.this) {
|
||||
@Override
|
||||
protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
|
||||
return new FailingConnection(FailingNettyRpcServer.this, channel);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -40,17 +40,17 @@ public class TestBlockingIPC extends AbstractTestIPC {
|
|||
HBaseClassTestRule.forClass(TestBlockingIPC.class);
|
||||
|
||||
@Override
|
||||
protected RpcServer createRpcServer(Server server, String name,
|
||||
protected RpcServer createRpcServer(String name,
|
||||
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
||||
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
|
||||
return RpcServerFactory.createRpcServer(null, name, services, bindAddress, conf, scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) {
|
||||
return new BlockingRpcClient(conf) {
|
||||
@Override
|
||||
Codec getCodec() {
|
||||
protected Codec getCodec() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
@ -67,7 +67,7 @@ public class TestBlockingIPC extends AbstractTestIPC {
|
|||
return new BlockingRpcClient(conf) {
|
||||
|
||||
@Override
|
||||
boolean isTcpNoDelay() {
|
||||
protected boolean isTcpNoDelay() {
|
||||
throw new RuntimeException("Injected fault");
|
||||
}
|
||||
};
|
||||
|
@ -102,9 +102,9 @@ public class TestBlockingIPC extends AbstractTestIPC {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected RpcServer createTestFailingRpcServer(Server server, String name,
|
||||
protected RpcServer createTestFailingRpcServer(String name,
|
||||
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
||||
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||
return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler);
|
||||
return new TestFailingRpcServer(null, name, services, bindAddress, conf, scheduler);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,11 +23,8 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||
import org.apache.hadoop.hbase.util.JVM;
|
||||
|
@ -40,7 +37,6 @@ import org.junit.runners.Parameterized;
|
|||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
|
||||
|
@ -107,10 +103,10 @@ public class TestNettyIPC extends AbstractTestIPC {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected RpcServer createRpcServer(Server server, String name,
|
||||
protected RpcServer createRpcServer(String name,
|
||||
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
||||
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||
return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true);
|
||||
return new NettyRpcServer(null, name, services, bindAddress, conf, scheduler, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,7 +115,7 @@ public class TestNettyIPC extends AbstractTestIPC {
|
|||
return new NettyRpcClient(conf) {
|
||||
|
||||
@Override
|
||||
Codec getCodec() {
|
||||
protected Codec getCodec() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -138,48 +134,16 @@ public class TestNettyIPC extends AbstractTestIPC {
|
|||
return new NettyRpcClient(conf) {
|
||||
|
||||
@Override
|
||||
boolean isTcpNoDelay() {
|
||||
protected boolean isTcpNoDelay() {
|
||||
throw new RuntimeException("Injected fault");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static class TestFailingRpcServer extends NettyRpcServer {
|
||||
|
||||
TestFailingRpcServer(Server server, String name,
|
||||
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
||||
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||
super(server, name, services, bindAddress, conf, scheduler, true);
|
||||
}
|
||||
|
||||
static final class FailingConnection extends NettyServerRpcConnection {
|
||||
private FailingConnection(TestFailingRpcServer rpcServer, Channel channel) {
|
||||
super(rpcServer, channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
|
||||
// this will throw exception after the connection header is read, and an RPC is sent
|
||||
// from client
|
||||
throw new DoNotRetryIOException("Failing for test");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
|
||||
return new NettyRpcServerPreambleHandler(TestFailingRpcServer.this) {
|
||||
@Override
|
||||
protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
|
||||
return new FailingConnection(TestFailingRpcServer.this, channel);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RpcServer createTestFailingRpcServer(Server server, String name,
|
||||
protected RpcServer createTestFailingRpcServer(String name,
|
||||
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
|
||||
Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||
return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler);
|
||||
return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
||||
|
||||
|
@ -26,6 +26,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.exceptions.SSLContextException;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||
import org.junit.After;
|
|
@ -0,0 +1,196 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.Security;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
|
||||
import org.apache.hadoop.hbase.HBaseServerBase;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
|
||||
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.AbstractTestIPC;
|
||||
import org.apache.hadoop.hbase.ipc.FailingNettyRpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ RPCTests.class, MediumTests.class })
|
||||
public class TestNettyTlsIPC extends AbstractTestIPC {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestNettyTlsIPC.class);
|
||||
|
||||
private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(CONF);
|
||||
|
||||
private static X509TestContextProvider PROVIDER;
|
||||
|
||||
private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;
|
||||
|
||||
private static HBaseServerBase<?> SERVER;
|
||||
|
||||
@Parameterized.Parameter(0)
|
||||
public X509KeyType caKeyType;
|
||||
|
||||
@Parameterized.Parameter(1)
|
||||
public X509KeyType certKeyType;
|
||||
|
||||
@Parameterized.Parameter(2)
|
||||
public String keyPassword;
|
||||
|
||||
@Parameterized.Parameter(3)
|
||||
public boolean acceptPlainText;
|
||||
|
||||
@Parameterized.Parameter(4)
|
||||
public boolean clientTlsEnabled;
|
||||
|
||||
private X509TestContext x509TestContext;
|
||||
|
||||
@Parameterized.Parameters(
|
||||
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3},"
|
||||
+ " clientTlsEnabled={4}")
|
||||
public static List<Object[]> data() {
|
||||
List<Object[]> params = new ArrayList<>();
|
||||
for (X509KeyType caKeyType : X509KeyType.values()) {
|
||||
for (X509KeyType certKeyType : X509KeyType.values()) {
|
||||
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
|
||||
// do not accept plain text
|
||||
params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true });
|
||||
// support plain text and client enables tls
|
||||
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true });
|
||||
// support plain text and client disables tls
|
||||
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false });
|
||||
}
|
||||
}
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws IOException {
|
||||
Security.addProvider(new BouncyCastleProvider());
|
||||
File dir = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
|
||||
.getCanonicalFile();
|
||||
FileUtils.forceMkdir(dir);
|
||||
// server must enable tls
|
||||
CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
|
||||
PROVIDER = new X509TestContextProvider(CONF, dir);
|
||||
EVENT_LOOP_GROUP_CONFIG =
|
||||
NettyEventLoopGroupConfig.setup(CONF, TestNettyTlsIPC.class.getSimpleName());
|
||||
SERVER = mock(HBaseServerBase.class);
|
||||
when(SERVER.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws InterruptedException {
|
||||
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
|
||||
EVENT_LOOP_GROUP_CONFIG.group().shutdownGracefully().sync();
|
||||
UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
|
||||
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
||||
CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlainText);
|
||||
CONF.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, clientTlsEnabled);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
x509TestContext.clearConfigurations();
|
||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
|
||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
|
||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
|
||||
System.clearProperty("com.sun.net.ssl.checkRevocation");
|
||||
System.clearProperty("com.sun.security.enableCRLDP");
|
||||
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
|
||||
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services,
|
||||
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {
|
||||
return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf) {
|
||||
return new NettyRpcClient(conf) {
|
||||
|
||||
@Override
|
||||
protected Codec getCodec() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractRpcClient<?> createRpcClient(Configuration conf) {
|
||||
return new NettyRpcClient(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(Configuration conf)
|
||||
throws IOException {
|
||||
return new NettyRpcClient(conf) {
|
||||
|
||||
@Override
|
||||
protected boolean isTcpNoDelay() {
|
||||
throw new RuntimeException("Injected fault");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RpcServer createTestFailingRpcServer(String name,
|
||||
List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
|
||||
RpcScheduler scheduler) throws IOException {
|
||||
return new FailingNettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.Security;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ RPCTests.class, MediumTests.class })
|
||||
public class TestNettyTlsIPCRejectPlainText {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestNettyTlsIPCRejectPlainText.class);
|
||||
|
||||
private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
|
||||
|
||||
private static File DIR;
|
||||
|
||||
private static X509TestContextProvider PROVIDER;
|
||||
|
||||
@Parameterized.Parameter(0)
|
||||
public X509KeyType caKeyType;
|
||||
|
||||
@Parameterized.Parameter(1)
|
||||
public X509KeyType certKeyType;
|
||||
|
||||
@Parameterized.Parameter(2)
|
||||
public String keyPassword;
|
||||
|
||||
private X509TestContext x509TestContext;
|
||||
|
||||
private RpcServer rpcServer;
|
||||
|
||||
private RpcClient rpcClient;
|
||||
|
||||
@Parameterized.Parameters(name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}")
|
||||
public static List<Object[]> data() {
|
||||
List<Object[]> params = new ArrayList<>();
|
||||
for (X509KeyType caKeyType : X509KeyType.values()) {
|
||||
for (X509KeyType certKeyType : X509KeyType.values()) {
|
||||
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
|
||||
params.add(new Object[] { caKeyType, certKeyType, keyPassword });
|
||||
}
|
||||
}
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws IOException {
|
||||
Security.addProvider(new BouncyCastleProvider());
|
||||
DIR = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
|
||||
.getCanonicalFile();
|
||||
FileUtils.forceMkdir(DIR);
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
|
||||
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
|
||||
conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false);
|
||||
PROVIDER = new X509TestContextProvider(conf, DIR);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() {
|
||||
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
|
||||
UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
|
||||
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
rpcServer = new NettyRpcServer(null, "testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1), true);
|
||||
rpcServer.start();
|
||||
rpcClient = new NettyRpcClient(conf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
if (rpcServer != null) {
|
||||
rpcServer.stop();
|
||||
}
|
||||
Closeables.close(rpcClient, true);
|
||||
x509TestContext.clearConfigurations();
|
||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
|
||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
|
||||
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
|
||||
System.clearProperty("com.sun.net.ssl.checkRevocation");
|
||||
System.clearProperty("com.sun.security.enableCRLDP");
|
||||
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
|
||||
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReject() throws IOException, ServiceException {
|
||||
BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress());
|
||||
ServiceException se = assertThrows(ServiceException.class,
|
||||
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello world").build()));
|
||||
assertThat(se.getCause(), instanceOf(ConnectionClosedException.class));
|
||||
}
|
||||
}
|
|
@ -1,209 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.security;
|
||||
|
||||
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED;
|
||||
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
|
||||
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
|
||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.Security;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.BaseX509ParameterizedTestCase;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
|
||||
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ SecurityTests.class, MediumTests.class })
|
||||
public class TestTlsIPC extends BaseX509ParameterizedTestCase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTlsIPC.class);
|
||||
|
||||
@Parameterized.Parameter()
|
||||
public X509KeyType caKeyType;
|
||||
|
||||
@Parameterized.Parameter(value = 1)
|
||||
public X509KeyType certKeyType;
|
||||
|
||||
@Parameterized.Parameter(value = 2)
|
||||
public String keyPassword;
|
||||
|
||||
@Parameterized.Parameter(value = 3)
|
||||
public Integer paramIndex;
|
||||
|
||||
@Parameterized.Parameters(
|
||||
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, paramIndex={3}")
|
||||
public static Collection<Object[]> data() {
|
||||
List<Object[]> params = new ArrayList<>();
|
||||
int paramIndex = 0;
|
||||
for (X509KeyType caKeyType : X509KeyType.values()) {
|
||||
for (X509KeyType certKeyType : X509KeyType.values()) {
|
||||
for (String keyPassword : new String[] { KEY_EMPTY_PASSWORD, KEY_NON_EMPTY_PASSWORD }) {
|
||||
params.add(new Object[] { caKeyType, certKeyType, keyPassword, paramIndex++ });
|
||||
}
|
||||
}
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
private static final String RPC_CLIENT_IMPL = NettyRpcClient.class.getName();
|
||||
private static final String RPC_SERVER_IMPL = NettyRpcServer.class.getName();
|
||||
private static final String HOST = "localhost";
|
||||
|
||||
private UserGroupInformation ugi;
|
||||
private Configuration tlsConfiguration;
|
||||
private Configuration clientConf;
|
||||
private Configuration serverConf;
|
||||
|
||||
@Override
|
||||
public void init(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword,
|
||||
Integer paramIndex) throws Exception {
|
||||
super.init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
||||
tlsConfiguration = x509TestContext.getHbaseConf();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpTest() throws Exception {
|
||||
init(caKeyType, certKeyType, keyPassword, paramIndex);
|
||||
String clientusername = "testuser";
|
||||
ugi =
|
||||
UserGroupInformation.createUserForTesting(clientusername, new String[] { clientusername });
|
||||
clientConf = HBaseConfiguration.create(tlsConfiguration);
|
||||
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RPC_CLIENT_IMPL);
|
||||
serverConf = HBaseConfiguration.create(tlsConfiguration);
|
||||
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, RPC_SERVER_IMPL);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
x509TestContext.clearSystemProperties();
|
||||
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_OCSP);
|
||||
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_CLR);
|
||||
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
|
||||
System.clearProperty("com.sun.net.ssl.checkRevocation");
|
||||
System.clearProperty("com.sun.security.enableCRLDP");
|
||||
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
|
||||
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoPlaintext() throws Exception {
|
||||
setTLSEncryption(true, false, true);
|
||||
callRpcService(User.create(ugi));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRejectPlaintext() {
|
||||
setTLSEncryption(true, false, false);
|
||||
Assert.assertThrows(ConnectionClosedException.class, () -> callRpcService(User.create(ugi)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAcceptPlaintext() throws Exception {
|
||||
setTLSEncryption(true, true, false);
|
||||
callRpcService(User.create(ugi));
|
||||
}
|
||||
|
||||
void setTLSEncryption(Boolean server, Boolean acceptPlaintext, Boolean client) {
|
||||
serverConf.set(HBASE_SERVER_NETTY_TLS_ENABLED, server.toString());
|
||||
serverConf.set(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlaintext.toString());
|
||||
clientConf.set(HBASE_CLIENT_NETTY_TLS_ENABLED, client.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
|
||||
* the stub, this function will throw root cause of that exception.
|
||||
*/
|
||||
private void callRpcService(User clientUser) throws Exception {
|
||||
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
|
||||
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
|
||||
|
||||
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
|
||||
|
||||
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
|
||||
Lists
|
||||
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
|
||||
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
|
||||
rpcServer.start();
|
||||
try (RpcClient rpcClient =
|
||||
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
|
||||
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
|
||||
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
|
||||
TestSecureIPC.TestThread th = new TestSecureIPC.TestThread(stub);
|
||||
AtomicReference<Throwable> exception = new AtomicReference<>();
|
||||
Collections.synchronizedList(new ArrayList<Throwable>());
|
||||
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
|
||||
@Override
|
||||
public void uncaughtException(Thread th, Throwable ex) {
|
||||
exception.set(ex);
|
||||
}
|
||||
};
|
||||
th.setUncaughtExceptionHandler(exceptionHandler);
|
||||
th.start();
|
||||
th.join();
|
||||
if (exception.get() != null) {
|
||||
// throw root cause.
|
||||
while (exception.get().getCause() != null) {
|
||||
exception.set(exception.get().getCause());
|
||||
}
|
||||
throw (Exception) exception.get();
|
||||
}
|
||||
} finally {
|
||||
rpcServer.stop();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -106,7 +106,7 @@ public class TestTlsWithKerberos {
|
|||
.setTempDir(new File(TEST_UTIL.getDataTestDir().toUri().getPath()))
|
||||
.setKeyStorePassword("Pa$$word").setKeyStoreKeyType(X509KeyType.RSA)
|
||||
.setTrustStoreKeyType(X509KeyType.RSA).setTrustStorePassword("Pa$$word").build();
|
||||
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
||||
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -123,10 +123,10 @@ public class TestTlsWithKerberos {
|
|||
krbKeytab = getKeytabFileForTesting();
|
||||
krbPrincipal = getPrincipalForTesting();
|
||||
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
|
||||
clientConf = HBaseConfiguration.create(x509TestContext.getHbaseConf());
|
||||
clientConf = HBaseConfiguration.create(x509TestContext.getConf());
|
||||
setSecuredConfiguration(clientConf);
|
||||
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RPC_CLIENT_IMPL);
|
||||
serverConf = HBaseConfiguration.create(x509TestContext.getHbaseConf());
|
||||
serverConf = HBaseConfiguration.create(x509TestContext.getConf());
|
||||
setSecuredConfiguration(serverConf);
|
||||
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, RPC_SERVER_IMPL);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue