HBASE-27278 Improve TestTlsIPC to reuse existing IPC test code (#4682)

Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
(cherry picked from commit 3309108ca7)
This commit is contained in:
Duo Zhang 2022-08-12 12:17:29 +08:00
parent 3f5076dac5
commit b820188c68
17 changed files with 692 additions and 508 deletions

View File

@ -230,7 +230,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
* Encapsulate the ugly casting and RuntimeException conversion in private method. * Encapsulate the ugly casting and RuntimeException conversion in private method.
* @return Codec to use on this client. * @return Codec to use on this client.
*/ */
Codec getCodec() { protected Codec getCodec() {
// For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
// "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding. // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf)); String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
@ -251,7 +251,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
} }
// for writing tests that want to throw exception when connecting. // for writing tests that want to throw exception when connecting.
boolean isTcpNoDelay() { protected boolean isTcpNoDelay() {
return tcpNoDelay; return tcpNoDelay;
} }

View File

@ -73,7 +73,7 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
} }
/** Used in test only. */ /** Used in test only. */
NettyRpcClient(Configuration configuration) { public NettyRpcClient(Configuration configuration) {
this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null); this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
} }

View File

@ -282,7 +282,7 @@ class NettyRpcConnection extends RpcConnection {
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
.handler(new ChannelInitializer() { .handler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) { if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) {

View File

@ -1,104 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.crypto.tls;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.security.Security;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.io.FileUtils;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.AfterClass;
import org.junit.BeforeClass;
/**
* Base class for parameterized unit tests that use X509TestContext for testing different X509
* parameter combinations (CA key type, cert key type, with/without a password, with/without
* hostname verification, etc). This base class takes care of setting up / cleaning up the test
* environment, and caching the X509TestContext objects used by the tests.
* <p/>
* This file has been copied from the Apache ZooKeeper project.
* @see <a href=
* "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/test/java/org/apache/zookeeper/common/BaseX509ParameterizedTestCase.java">Base
* revision</a>
*/
public abstract class BaseX509ParameterizedTestCase {
protected static final String KEY_NON_EMPTY_PASSWORD = "pa$$w0rd";
protected static final String KEY_EMPTY_PASSWORD = "";
/**
* Because key generation and writing / deleting files is kind of expensive, we cache the certs
* and on-disk files between test cases. None of the test cases modify any of this data so it's
* safe to reuse between tests. This caching makes all test cases after the first one for a given
* parameter combination complete almost instantly.
*/
protected static Map<Integer, X509TestContext> cachedTestContexts;
protected static File tempDir;
protected X509TestContext x509TestContext;
@BeforeClass
public static void setUpBaseClass() throws Exception {
Security.addProvider(new BouncyCastleProvider());
cachedTestContexts = new HashMap<>();
tempDir = Files.createTempDirectory("x509Tests").toFile();
}
@AfterClass
public static void cleanUpBaseClass() {
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
cachedTestContexts.clear();
cachedTestContexts = null;
try {
FileUtils.deleteDirectory(tempDir);
} catch (IOException e) {
// ignore
}
}
/**
* Init method. See example usage in {@link TestX509Util}.
* @param paramIndex the index under which the X509TestContext should be cached.
* @param contextSupplier a function that creates and returns the X509TestContext for the current
* index if one is not already cached.
*/
protected void init(Integer paramIndex, Supplier<X509TestContext> contextSupplier) {
if (cachedTestContexts.containsKey(paramIndex)) {
x509TestContext = cachedTestContexts.get(paramIndex);
} else {
x509TestContext = contextSupplier.get();
cachedTestContexts.put(paramIndex, x509TestContext);
}
}
protected void init(final X509KeyType caKeyType, final X509KeyType certKeyType,
final String keyPassword, final Integer paramIndex) throws Exception {
init(paramIndex, () -> {
try {
return X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
.setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
.setTrustStoreKeyType(caKeyType).build();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -17,27 +17,39 @@
*/ */
package org.apache.hadoop.hbase.io.crypto.tls; package org.apache.hadoop.hbase.io.crypto.tls;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
import java.security.Security; import java.security.Security;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.exceptions.KeyManagerException; import org.apache.hadoop.hbase.exceptions.KeyManagerException;
import org.apache.hadoop.hbase.exceptions.SSLContextException; import org.apache.hadoop.hbase.exceptions.SSLContextException;
import org.apache.hadoop.hbase.exceptions.TrustManagerException; import org.apache.hadoop.hbase.exceptions.TrustManagerException;
import org.apache.hadoop.hbase.exceptions.X509Exception; import org.apache.hadoop.hbase.exceptions.X509Exception;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -55,12 +67,16 @@ import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
*/ */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@Category({ MiscTests.class, SmallTests.class }) @Category({ MiscTests.class, SmallTests.class })
public class TestX509Util extends BaseX509ParameterizedTestCase { public class TestX509Util {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestX509Util.class); HBaseClassTestRule.forClass(TestX509Util.class);
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
private static X509TestContextProvider PROVIDER;
@Parameterized.Parameter() @Parameterized.Parameter()
public X509KeyType caKeyType; public X509KeyType caKeyType;
@ -73,6 +89,10 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Parameterized.Parameter(value = 3) @Parameterized.Parameter(value = 3)
public Integer paramIndex; public Integer paramIndex;
private X509TestContext x509TestContext;
private Configuration conf;
@Parameterized.Parameters( @Parameterized.Parameters(
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, paramIndex={3}") name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, paramIndex={3}")
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
@ -80,7 +100,7 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
int paramIndex = 0; int paramIndex = 0;
for (X509KeyType caKeyType : X509KeyType.values()) { for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) { for (X509KeyType certKeyType : X509KeyType.values()) {
for (String keyPassword : new String[] { KEY_EMPTY_PASSWORD, KEY_NON_EMPTY_PASSWORD }) { for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
params.add(new Object[] { caKeyType, certKeyType, keyPassword, paramIndex++ }); params.add(new Object[] { caKeyType, certKeyType, keyPassword, paramIndex++ });
} }
} }
@ -88,22 +108,34 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
return params; return params;
} }
private Configuration hbaseConf; @BeforeClass
public static void setUpBeforeClass() throws IOException {
Security.addProvider(new BouncyCastleProvider());
File dir = new File(UTIL.getDataTestDir(TestX509Util.class.getSimpleName()).toString())
.getCanonicalFile();
FileUtils.forceMkdir(dir);
PROVIDER = new X509TestContextProvider(UTIL.getConfiguration(), dir);
}
@Override @AfterClass
public void init(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword, public static void tearDownAfterClass() {
Integer paramIndex) throws Exception { Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
super.init(caKeyType, certKeyType, keyPassword, paramIndex); UTIL.cleanupTestDir();
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS); }
hbaseConf = x509TestContext.getHbaseConf();
@Before
public void setUp() throws IOException {
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
conf = new Configuration(UTIL.getConfiguration());
} }
@After @After
public void cleanUp() { public void cleanUp() {
x509TestContext.clearSystemProperties(); x509TestContext.clearConfigurations();
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_OCSP); x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_CLR); x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_PROTOCOL); x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
System.clearProperty("com.sun.net.ssl.checkRevocation"); System.clearProperty("com.sun.net.ssl.checkRevocation");
System.clearProperty("com.sun.security.enableCRLDP"); System.clearProperty("com.sun.security.enableCRLDP");
Security.setProperty("ocsp.enable", Boolean.FALSE.toString()); Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
@ -112,69 +144,59 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testCreateSSLContextWithoutCustomProtocol() throws Exception { public void testCreateSSLContextWithoutCustomProtocol() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex); SslContext sslContext = X509Util.createSslContextForClient(conf);
SslContext sslContext = X509Util.createSslContextForClient(hbaseConf);
ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class); ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
assertEquals(new String[] { X509Util.DEFAULT_PROTOCOL }, assertArrayEquals(new String[] { X509Util.DEFAULT_PROTOCOL },
sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols()); sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols());
} }
@Test @Test
public void testCreateSSLContextWithCustomProtocol() throws Exception { public void testCreateSSLContextWithCustomProtocol() throws Exception {
final String protocol = "TLSv1.1"; final String protocol = "TLSv1.1";
init(caKeyType, certKeyType, keyPassword, paramIndex); conf.set(X509Util.TLS_CONFIG_PROTOCOL, protocol);
hbaseConf.set(X509Util.TLS_CONFIG_PROTOCOL, protocol);
ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class); ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
SslContext sslContext = X509Util.createSslContextForServer(hbaseConf); SslContext sslContext = X509Util.createSslContextForServer(conf);
assertEquals(Collections.singletonList(protocol), assertEquals(Collections.singletonList(protocol),
Arrays.asList(sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols())); Arrays.asList(sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols()));
} }
@Test(expected = SSLContextException.class) @Test(expected = SSLContextException.class)
public void testCreateSSLContextWithoutKeyStoreLocationServer() throws Exception { public void testCreateSSLContextWithoutKeyStoreLocationServer() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex); conf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION); X509Util.createSslContextForServer(conf);
X509Util.createSslContextForServer(hbaseConf);
} }
@Test @Test
public void testCreateSSLContextWithoutKeyStoreLocationClient() throws Exception { public void testCreateSSLContextWithoutKeyStoreLocationClient() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex); conf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION); X509Util.createSslContextForClient(conf);
X509Util.createSslContextForClient(hbaseConf);
} }
@Test(expected = X509Exception.class) @Test(expected = X509Exception.class)
public void testCreateSSLContextWithoutKeyStorePassword() throws Exception { public void testCreateSSLContextWithoutKeyStorePassword() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex); assumeTrue(x509TestContext.isKeyStoreEncrypted());
if (!x509TestContext.isKeyStoreEncrypted()) { conf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
throw new SSLContextException(""); X509Util.createSslContextForServer(conf);
}
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
X509Util.createSslContextForServer(hbaseConf);
} }
@Test @Test
public void testCreateSSLContextWithoutTrustStoreLocationClient() throws Exception { public void testCreateSSLContextWithoutTrustStoreLocationClient() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex); conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION); X509Util.createSslContextForClient(conf);
X509Util.createSslContextForClient(hbaseConf);
} }
@Test @Test
public void testCreateSSLContextWithoutTrustStoreLocationServer() throws Exception { public void testCreateSSLContextWithoutTrustStoreLocationServer() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex); conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION); X509Util.createSslContextForServer(conf);
X509Util.createSslContextForServer(hbaseConf);
} }
// It would be great to test the value of PKIXBuilderParameters#setRevocationEnabled, // It would be great to test the value of PKIXBuilderParameters#setRevocationEnabled,
// but it does not appear to be possible // but it does not appear to be possible
@Test @Test
public void testCRLEnabled() throws Exception { public void testCRLEnabled() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex); conf.setBoolean(X509Util.TLS_CONFIG_CLR, true);
hbaseConf.setBoolean(X509Util.TLS_CONFIG_CLR, true); X509Util.createSslContextForServer(conf);
X509Util.createSslContextForServer(hbaseConf);
assertTrue(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation"))); assertTrue(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation")));
assertTrue(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP"))); assertTrue(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP")));
assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable"))); assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable")));
@ -182,8 +204,7 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testCRLDisabled() throws Exception { public void testCRLDisabled() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex); X509Util.createSslContextForServer(conf);
X509Util.createSslContextForServer(hbaseConf);
assertFalse(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation"))); assertFalse(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation")));
assertFalse(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP"))); assertFalse(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP")));
assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable"))); assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable")));
@ -191,7 +212,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadJKSKeyStore() throws Exception { public void testLoadJKSKeyStore() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
// Make sure we can instantiate a key manager from the JKS file on disk // Make sure we can instantiate a key manager from the JKS file on disk
X509Util.createKeyManager( X509Util.createKeyManager(
x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
@ -200,10 +220,7 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadJKSKeyStoreNullPassword() throws Exception { public void testLoadJKSKeyStoreNullPassword() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex); assumeTrue(x509TestContext.getKeyStorePassword().isEmpty());
if (!x509TestContext.getKeyStorePassword().isEmpty()) {
return;
}
// Make sure that empty password and null password are treated the same // Make sure that empty password and null password are treated the same
X509Util.createKeyManager( X509Util.createKeyManager(
x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), null, x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), null,
@ -212,7 +229,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadJKSKeyStoreFileTypeDefaultToJks() throws Exception { public void testLoadJKSKeyStoreFileTypeDefaultToJks() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
// Make sure we can instantiate a key manager from the JKS file on disk // Make sure we can instantiate a key manager from the JKS file on disk
X509Util.createKeyManager( X509Util.createKeyManager(
x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
@ -222,7 +238,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadJKSKeyStoreWithWrongPassword() throws Exception { public void testLoadJKSKeyStoreWithWrongPassword() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
assertThrows(KeyManagerException.class, () -> { assertThrows(KeyManagerException.class, () -> {
// Attempting to load with the wrong key password should fail // Attempting to load with the wrong key password should fail
X509Util.createKeyManager( X509Util.createKeyManager(
@ -233,7 +248,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadJKSTrustStore() throws Exception { public void testLoadJKSTrustStore() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
// Make sure we can instantiate a trust manager from the JKS file on disk // Make sure we can instantiate a trust manager from the JKS file on disk
X509Util.createTrustManager( X509Util.createTrustManager(
x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
@ -242,7 +256,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadJKSTrustStoreNullPassword() throws Exception { public void testLoadJKSTrustStoreNullPassword() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
if (!x509TestContext.getTrustStorePassword().isEmpty()) { if (!x509TestContext.getTrustStorePassword().isEmpty()) {
return; return;
} }
@ -254,18 +267,15 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadJKSTrustStoreFileTypeDefaultToJks() throws Exception { public void testLoadJKSTrustStoreFileTypeDefaultToJks() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
// Make sure we can instantiate a trust manager from the JKS file on disk // Make sure we can instantiate a trust manager from the JKS file on disk
X509Util.createTrustManager( X509Util.createTrustManager(
x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
x509TestContext.getTrustStorePassword(), null, // null StoreFileType means 'autodetect from // null StoreFileType means 'autodetect from file extension'
// file extension' x509TestContext.getTrustStorePassword(), null, true, true);
true, true);
} }
@Test @Test
public void testLoadJKSTrustStoreWithWrongPassword() throws Exception { public void testLoadJKSTrustStoreWithWrongPassword() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
assertThrows(TrustManagerException.class, () -> { assertThrows(TrustManagerException.class, () -> {
// Attempting to load with the wrong key password should fail // Attempting to load with the wrong key password should fail
X509Util.createTrustManager( X509Util.createTrustManager(
@ -276,7 +286,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadPKCS12KeyStore() throws Exception { public void testLoadPKCS12KeyStore() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
// Make sure we can instantiate a key manager from the PKCS12 file on disk // Make sure we can instantiate a key manager from the PKCS12 file on disk
X509Util.createKeyManager( X509Util.createKeyManager(
x509TestContext.getKeyStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(), x509TestContext.getKeyStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
@ -285,7 +294,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadPKCS12KeyStoreNullPassword() throws Exception { public void testLoadPKCS12KeyStoreNullPassword() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
if (!x509TestContext.getKeyStorePassword().isEmpty()) { if (!x509TestContext.getKeyStorePassword().isEmpty()) {
return; return;
} }
@ -297,7 +305,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadPKCS12KeyStoreWithWrongPassword() throws Exception { public void testLoadPKCS12KeyStoreWithWrongPassword() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
assertThrows(KeyManagerException.class, () -> { assertThrows(KeyManagerException.class, () -> {
// Attempting to load with the wrong key password should fail // Attempting to load with the wrong key password should fail
X509Util.createKeyManager( X509Util.createKeyManager(
@ -308,7 +315,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadPKCS12TrustStore() throws Exception { public void testLoadPKCS12TrustStore() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
// Make sure we can instantiate a trust manager from the PKCS12 file on disk // Make sure we can instantiate a trust manager from the PKCS12 file on disk
X509Util.createTrustManager( X509Util.createTrustManager(
x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(), x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
@ -318,7 +324,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadPKCS12TrustStoreNullPassword() throws Exception { public void testLoadPKCS12TrustStoreNullPassword() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
if (!x509TestContext.getTrustStorePassword().isEmpty()) { if (!x509TestContext.getTrustStorePassword().isEmpty()) {
return; return;
} }
@ -330,7 +335,6 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testLoadPKCS12TrustStoreWithWrongPassword() throws Exception { public void testLoadPKCS12TrustStoreWithWrongPassword() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
assertThrows(TrustManagerException.class, () -> { assertThrows(TrustManagerException.class, () -> {
// Attempting to load with the wrong key password should fail // Attempting to load with the wrong key password should fail
X509Util.createTrustManager( X509Util.createTrustManager(
@ -341,47 +345,41 @@ public class TestX509Util extends BaseX509ParameterizedTestCase {
@Test @Test
public void testGetDefaultCipherSuitesJava8() throws Exception { public void testGetDefaultCipherSuitesJava8() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("1.8"); String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("1.8");
// Java 8 default should have the CBC suites first // Java 8 default should have the CBC suites first
assertTrue(cipherSuites[0].contains("CBC")); assertThat(cipherSuites[0], containsString("CBC"));
} }
@Test @Test
public void testGetDefaultCipherSuitesJava9() throws Exception { public void testGetDefaultCipherSuitesJava9() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("9"); String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("9");
// Java 9+ default should have the GCM suites first // Java 9+ default should have the GCM suites first
assertTrue(cipherSuites[0].contains("GCM")); assertThat(cipherSuites[0], containsString("GCM"));
} }
@Test @Test
public void testGetDefaultCipherSuitesJava10() throws Exception { public void testGetDefaultCipherSuitesJava10() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("10"); String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("10");
// Java 9+ default should have the GCM suites first // Java 9+ default should have the GCM suites first
assertTrue(cipherSuites[0].contains("GCM")); assertThat(cipherSuites[0], containsString("GCM"));
} }
@Test @Test
public void testGetDefaultCipherSuitesJava11() throws Exception { public void testGetDefaultCipherSuitesJava11() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("11"); String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("11");
// Java 9+ default should have the GCM suites first // Java 9+ default should have the GCM suites first
assertTrue(cipherSuites[0].contains("GCM")); assertThat(cipherSuites[0], containsString("GCM"));
} }
@Test @Test
public void testGetDefaultCipherSuitesUnknownVersion() throws Exception { public void testGetDefaultCipherSuitesUnknownVersion() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("notaversion"); String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("notaversion");
// If version can't be parsed, use the more conservative Java 8 default // If version can't be parsed, use the more conservative Java 8 default
assertTrue(cipherSuites[0].contains("CBC")); assertThat(cipherSuites[0], containsString("CBC"));
} }
@Test @Test
public void testGetDefaultCipherSuitesNullVersion() throws Exception { public void testGetDefaultCipherSuitesNullVersion() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
assertThrows(NullPointerException.class, () -> { assertThrows(NullPointerException.class, () -> {
X509Util.getDefaultCipherSuitesForJavaVersion(null); X509Util.getDefaultCipherSuitesForJavaVersion(null);
}); });

View File

@ -53,7 +53,7 @@ public final class X509TestContext {
private static final String KEY_STORE_PREFIX = "hbase_test_key"; private static final String KEY_STORE_PREFIX = "hbase_test_key";
private final File tempDir; private final File tempDir;
private final Configuration hbaseConf = HBaseConfiguration.create(); private final Configuration conf;
private final X509Certificate trustStoreCertificate; private final X509Certificate trustStoreCertificate;
private final String trustStorePassword; private final String trustStorePassword;
@ -70,6 +70,7 @@ public final class X509TestContext {
/** /**
* Constructor is intentionally private, use the Builder class instead. * Constructor is intentionally private, use the Builder class instead.
* @param conf the configuration
* @param tempDir the directory in which key store and trust store temp files will be * @param tempDir the directory in which key store and trust store temp files will be
* written. * written.
* @param trustStoreKeyPair the key pair for the trust store. * @param trustStoreKeyPair the key pair for the trust store.
@ -78,12 +79,13 @@ public final class X509TestContext {
* @param keyStoreKeyPair the key pair for the key store. * @param keyStoreKeyPair the key pair for the key store.
* @param keyStorePassword the password to protect the key store private key. * @param keyStorePassword the password to protect the key store private key.
*/ */
private X509TestContext(File tempDir, KeyPair trustStoreKeyPair, String trustStorePassword, private X509TestContext(Configuration conf, File tempDir, KeyPair trustStoreKeyPair,
KeyPair keyStoreKeyPair, String keyStorePassword) String trustStorePassword, KeyPair keyStoreKeyPair, String keyStorePassword)
throws IOException, GeneralSecurityException, OperatorCreationException { throws IOException, GeneralSecurityException, OperatorCreationException {
if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) { if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) {
throw new IllegalStateException("BC Security provider was not found"); throw new IllegalStateException("BC Security provider was not found");
} }
this.conf = conf;
this.tempDir = requireNonNull(tempDir); this.tempDir = requireNonNull(tempDir);
if (!tempDir.isDirectory()) { if (!tempDir.isDirectory()) {
throw new IllegalArgumentException("Not a directory: " + tempDir); throw new IllegalArgumentException("Not a directory: " + tempDir);
@ -204,8 +206,8 @@ public final class X509TestContext {
return keyStorePassword.length() > 0; return keyStorePassword.length() > 0;
} }
public Configuration getHbaseConf() { public Configuration getConf() {
return hbaseConf; return conf;
} }
/** /**
@ -301,25 +303,25 @@ public final class X509TestContext {
* @param trustStoreFileType the store file type to use for the trust store (JKS, PEM, ...). * @param trustStoreFileType the store file type to use for the trust store (JKS, PEM, ...).
* @throws IOException if there is an error creating the key store file or trust store file. * @throws IOException if there is an error creating the key store file or trust store file.
*/ */
public void setSystemProperties(KeyStoreFileType keyStoreFileType, public void setConfigurations(KeyStoreFileType keyStoreFileType,
KeyStoreFileType trustStoreFileType) throws IOException { KeyStoreFileType trustStoreFileType) throws IOException {
hbaseConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, conf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION,
this.getKeyStoreFile(keyStoreFileType).getAbsolutePath()); this.getKeyStoreFile(keyStoreFileType).getAbsolutePath());
hbaseConf.set(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD, this.getKeyStorePassword()); conf.set(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD, this.getKeyStorePassword());
hbaseConf.set(X509Util.TLS_CONFIG_KEYSTORE_TYPE, keyStoreFileType.getPropertyValue()); conf.set(X509Util.TLS_CONFIG_KEYSTORE_TYPE, keyStoreFileType.getPropertyValue());
hbaseConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, conf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION,
this.getTrustStoreFile(trustStoreFileType).getAbsolutePath()); this.getTrustStoreFile(trustStoreFileType).getAbsolutePath());
hbaseConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD, this.getTrustStorePassword()); conf.set(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD, this.getTrustStorePassword());
hbaseConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE, trustStoreFileType.getPropertyValue()); conf.set(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE, trustStoreFileType.getPropertyValue());
} }
public void clearSystemProperties() { public void clearConfigurations() {
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION); conf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD); conf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_TYPE); conf.unset(X509Util.TLS_CONFIG_KEYSTORE_TYPE);
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION); conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD); conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD);
hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE); conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE);
} }
/** /**
@ -327,6 +329,7 @@ public final class X509TestContext {
*/ */
public static class Builder { public static class Builder {
private final Configuration conf;
private File tempDir; private File tempDir;
private X509KeyType trustStoreKeyType; private X509KeyType trustStoreKeyType;
private String trustStorePassword; private String trustStorePassword;
@ -334,9 +337,10 @@ public final class X509TestContext {
private String keyStorePassword; private String keyStorePassword;
/** /**
* Creates an empty builder. * Creates an empty builder with the given Configuration.
*/ */
public Builder() { public Builder(Configuration conf) {
this.conf = conf;
trustStoreKeyType = X509KeyType.EC; trustStoreKeyType = X509KeyType.EC;
trustStorePassword = ""; trustStorePassword = "";
keyStoreKeyType = X509KeyType.EC; keyStoreKeyType = X509KeyType.EC;
@ -351,8 +355,8 @@ public final class X509TestContext {
throws IOException, GeneralSecurityException, OperatorCreationException { throws IOException, GeneralSecurityException, OperatorCreationException {
KeyPair trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType); KeyPair trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType);
KeyPair keyStoreKeyPair = X509TestHelpers.generateKeyPair(keyStoreKeyType); KeyPair keyStoreKeyPair = X509TestHelpers.generateKeyPair(keyStoreKeyType);
return new X509TestContext(tempDir, trustStoreKeyPair, trustStorePassword, keyStoreKeyPair, return new X509TestContext(conf, tempDir, trustStoreKeyPair, trustStorePassword,
keyStorePassword); keyStoreKeyPair, keyStorePassword);
} }
/** /**
@ -416,7 +420,14 @@ public final class X509TestContext {
* @return a new Builder. * @return a new Builder.
*/ */
public static Builder newBuilder() { public static Builder newBuilder() {
return new Builder(); return newBuilder(HBaseConfiguration.create());
} }
/**
* Returns a new default-constructed Builder.
* @return a new Builder.
*/
public static Builder newBuilder(Configuration conf) {
return new Builder(conf);
}
} }

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.crypto.tls;
import java.io.File;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
/**
* Will cache X509TestContext to speed up tests.
*/
public class X509TestContextProvider {
private static final class CacheKey {
private final X509KeyType caKeyType;
private final X509KeyType certKeyType;
private final String keyPassword;
CacheKey(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword) {
this.caKeyType = caKeyType;
this.certKeyType = certKeyType;
this.keyPassword = keyPassword;
}
@Override
public int hashCode() {
return Objects.hash(caKeyType, certKeyType, keyPassword);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof CacheKey)) {
return false;
}
CacheKey other = (CacheKey) obj;
return caKeyType == other.caKeyType && certKeyType == other.certKeyType
&& Objects.equals(keyPassword, other.keyPassword);
}
}
private final Configuration conf;
private final File tempDir;
private final LoadingCache<CacheKey, X509TestContext> ctxs =
CacheBuilder.newBuilder().build(new CacheLoader<CacheKey, X509TestContext>() {
@Override
public X509TestContext load(CacheKey key) throws Exception {
return X509TestContext.newBuilder(conf).setTempDir(tempDir)
.setKeyStorePassword(key.keyPassword).setKeyStoreKeyType(key.certKeyType)
.setTrustStorePassword(key.keyPassword).setTrustStoreKeyType(key.caKeyType).build();
}
});
public X509TestContextProvider(Configuration conf, File tempDir) {
this.conf = conf;
this.tempDir = tempDir;
}
public X509TestContext get(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword) {
return ctxs.getUnchecked(new CacheKey(caKeyType, certKeyType, keyPassword));
}
}

View File

@ -69,7 +69,7 @@ public class IntegrationTestRpcClient {
protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) { protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) { return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
@Override @Override
Codec getCodec() { protected Codec getCodec() {
return null; return null;
} }
}; };

View File

@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -103,7 +102,7 @@ public abstract class AbstractTestIPC {
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName()); CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
} }
protected abstract RpcServer createRpcServer(final Server server, final String name, protected abstract RpcServer createRpcServer(final String name,
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException; Configuration conf, RpcScheduler scheduler) throws IOException;
@ -117,11 +116,11 @@ public abstract class AbstractTestIPC {
*/ */
@Test @Test
public void testNoCodec() throws IOException, ServiceException { public void testNoCodec() throws IOException, ServiceException {
Configuration conf = HBaseConfiguration.create(); Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer(null, "testRpcServer", RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) { try (AbstractRpcClient<?> client = createRpcClientNoCodec(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
HBaseRpcController pcrc = new HBaseRpcControllerImpl(); HBaseRpcController pcrc = new HBaseRpcControllerImpl();
@ -143,18 +142,18 @@ public abstract class AbstractTestIPC {
*/ */
@Test @Test
public void testCompressCellBlock() throws IOException, ServiceException { public void testCompressCellBlock() throws IOException, ServiceException {
Configuration conf = new Configuration(HBaseConfiguration.create()); Configuration clientConf = new Configuration(CONF);
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); clientConf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
List<Cell> cells = new ArrayList<>(); List<Cell> cells = new ArrayList<>();
int count = 3; int count = 3;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
cells.add(CELL); cells.add(CELL);
} }
RpcServer rpcServer = createRpcServer(null, "testRpcServer", RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(conf)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells)); HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
@ -179,11 +178,11 @@ public abstract class AbstractTestIPC {
@Test @Test
public void testRTEDuringConnectionSetup() throws Exception { public void testRTEDuringConnectionSetup() throws Exception {
Configuration conf = HBaseConfiguration.create(); Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer(null, "testRpcServer", RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) { try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.ping(null, EmptyRequestProto.getDefaultInstance()); stub.ping(null, EmptyRequestProto.getDefaultInstance());
@ -201,12 +200,13 @@ public abstract class AbstractTestIPC {
*/ */
@Test @Test
public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
Configuration clientConf = new Configuration(CONF);
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = createRpcServer(null, "testRpcServer", RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, scheduler); new InetSocketAddress("localhost", 0), CONF, scheduler);
verify(scheduler).init(any(RpcScheduler.Context.class)); verify(scheduler).init(any(RpcScheduler.Context.class));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
verify(scheduler).start(); verify(scheduler).start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
@ -224,12 +224,12 @@ public abstract class AbstractTestIPC {
/** Tests that the rpc scheduler is called when requests arrive. */ /** Tests that the rpc scheduler is called when requests arrive. */
@Test @Test
public void testRpcMaxRequestSize() throws IOException, ServiceException { public void testRpcMaxRequestSize() throws IOException, ServiceException {
Configuration conf = new Configuration(CONF); Configuration clientConf = new Configuration(CONF);
conf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000); clientConf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
RpcServer rpcServer = createRpcServer(null, "testRpcServer", RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1)); new InetSocketAddress("localhost", 0), clientConf, new FifoRpcScheduler(clientConf, 1));
try (AbstractRpcClient<?> client = createRpcClient(conf)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
StringBuilder message = new StringBuilder(1200); StringBuilder message = new StringBuilder(1200);
@ -258,11 +258,12 @@ public abstract class AbstractTestIPC {
@Test @Test
public void testRpcServerForNotNullRemoteAddressInCallObject() public void testRpcServerForNotNullRemoteAddressInCallObject()
throws IOException, ServiceException { throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer", Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
try (AbstractRpcClient<?> client = createRpcClient(CONF)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
assertEquals(localAddr.getAddress().getHostAddress(), assertEquals(localAddr.getAddress().getHostAddress(),
@ -274,10 +275,11 @@ public abstract class AbstractTestIPC {
@Test @Test
public void testRemoteError() throws IOException, ServiceException { public void testRemoteError() throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer", Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.error(null, EmptyRequestProto.getDefaultInstance()); stub.error(null, EmptyRequestProto.getDefaultInstance());
@ -293,10 +295,11 @@ public abstract class AbstractTestIPC {
@Test @Test
public void testTimeout() throws IOException { public void testTimeout() throws IOException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer", Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
HBaseRpcController pcrc = new HBaseRpcControllerImpl(); HBaseRpcController pcrc = new HBaseRpcControllerImpl();
@ -323,19 +326,19 @@ public abstract class AbstractTestIPC {
} }
} }
protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name, protected abstract RpcServer createTestFailingRpcServer(final String name,
final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException; Configuration conf, RpcScheduler scheduler) throws IOException;
/** Tests that the connection closing is handled by the client with outstanding RPC calls */ /** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test @Test
public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
Configuration conf = new Configuration(CONF); Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer", RpcServer rpcServer = createTestFailingRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(conf)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@ -350,11 +353,11 @@ public abstract class AbstractTestIPC {
@Test @Test
public void testAsyncEcho() throws IOException { public void testAsyncEcho() throws IOException {
Configuration conf = HBaseConfiguration.create(); Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer(null, "testRpcServer", RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(conf)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
Interface stub = newStub(client, rpcServer.getListenerAddress()); Interface stub = newStub(client, rpcServer.getListenerAddress());
int num = 10; int num = 10;
@ -368,10 +371,11 @@ public abstract class AbstractTestIPC {
callbackList.add(done); callbackList.add(done);
} }
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
EchoResponseProto resp = callbackList.get(i).get();
HBaseRpcController pcrc = pcrcList.get(i); HBaseRpcController pcrc = pcrcList.get(i);
assertEquals("hello-" + i, resp.getMessage());
assertFalse(pcrc.failed()); assertFalse(pcrc.failed());
assertNull(pcrc.cellScanner()); assertNull(pcrc.cellScanner());
assertEquals("hello-" + i, callbackList.get(i).get().getMessage());
} }
} finally { } finally {
rpcServer.stop(); rpcServer.stop();
@ -380,8 +384,9 @@ public abstract class AbstractTestIPC {
@Test @Test
public void testAsyncRemoteError() throws IOException { public void testAsyncRemoteError() throws IOException {
AbstractRpcClient<?> client = createRpcClient(CONF); Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer(null, "testRpcServer", AbstractRpcClient<?> client = createRpcClient(clientConf);
RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try { try {
@ -404,10 +409,11 @@ public abstract class AbstractTestIPC {
@Test @Test
public void testAsyncTimeout() throws IOException { public void testAsyncTimeout() throws IOException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer", Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
Interface stub = newStub(client, rpcServer.getListenerAddress()); Interface stub = newStub(client, rpcServer.getListenerAddress());
List<HBaseRpcController> pcrcList = new ArrayList<>(); List<HBaseRpcController> pcrcList = new ArrayList<>();
@ -485,10 +491,11 @@ public abstract class AbstractTestIPC {
@Test @Test
public void testTracingSuccessIpc() throws IOException, ServiceException { public void testTracingSuccessIpc() throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer", Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
@ -513,10 +520,11 @@ public abstract class AbstractTestIPC {
@Test @Test
public void testTracingErrorIpc() throws IOException { public void testTracingErrorIpc() throws IOException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer", Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) { try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
// use the ISA from the running server so that we can get the port selected. // use the ISA from the running server so that we can get the port selected.

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
public class FailingNettyRpcServer extends NettyRpcServer {
public FailingNettyRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true);
}
static final class FailingConnection extends NettyServerRpcConnection {
private FailingConnection(FailingNettyRpcServer rpcServer, Channel channel) {
super(rpcServer, channel);
}
@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}
@Override
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
return new NettyRpcServerPreambleHandler(FailingNettyRpcServer.this) {
@Override
protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
return new FailingConnection(FailingNettyRpcServer.this, channel);
}
};
}
}

View File

@ -40,17 +40,17 @@ public class TestBlockingIPC extends AbstractTestIPC {
HBaseClassTestRule.forClass(TestBlockingIPC.class); HBaseClassTestRule.forClass(TestBlockingIPC.class);
@Override @Override
protected RpcServer createRpcServer(Server server, String name, protected RpcServer createRpcServer(String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException { Configuration conf, RpcScheduler scheduler) throws IOException {
return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); return RpcServerFactory.createRpcServer(null, name, services, bindAddress, conf, scheduler);
} }
@Override @Override
protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) { protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) {
return new BlockingRpcClient(conf) { return new BlockingRpcClient(conf) {
@Override @Override
Codec getCodec() { protected Codec getCodec() {
return null; return null;
} }
}; };
@ -67,7 +67,7 @@ public class TestBlockingIPC extends AbstractTestIPC {
return new BlockingRpcClient(conf) { return new BlockingRpcClient(conf) {
@Override @Override
boolean isTcpNoDelay() { protected boolean isTcpNoDelay() {
throw new RuntimeException("Injected fault"); throw new RuntimeException("Injected fault");
} }
}; };
@ -102,9 +102,9 @@ public class TestBlockingIPC extends AbstractTestIPC {
} }
@Override @Override
protected RpcServer createTestFailingRpcServer(Server server, String name, protected RpcServer createTestFailingRpcServer(String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException { Configuration conf, RpcScheduler scheduler) throws IOException {
return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler); return new TestFailingRpcServer(null, name, services, bindAddress, conf, scheduler);
} }
} }

View File

@ -23,11 +23,8 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.JVM;
@ -40,7 +37,6 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel; import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
@ -107,10 +103,10 @@ public class TestNettyIPC extends AbstractTestIPC {
} }
@Override @Override
protected RpcServer createRpcServer(Server server, String name, protected RpcServer createRpcServer(String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException { Configuration conf, RpcScheduler scheduler) throws IOException {
return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true); return new NettyRpcServer(null, name, services, bindAddress, conf, scheduler, true);
} }
@Override @Override
@ -119,7 +115,7 @@ public class TestNettyIPC extends AbstractTestIPC {
return new NettyRpcClient(conf) { return new NettyRpcClient(conf) {
@Override @Override
Codec getCodec() { protected Codec getCodec() {
return null; return null;
} }
@ -138,48 +134,16 @@ public class TestNettyIPC extends AbstractTestIPC {
return new NettyRpcClient(conf) { return new NettyRpcClient(conf) {
@Override @Override
boolean isTcpNoDelay() { protected boolean isTcpNoDelay() {
throw new RuntimeException("Injected fault"); throw new RuntimeException("Injected fault");
} }
}; };
} }
private static class TestFailingRpcServer extends NettyRpcServer { @Override
protected RpcServer createTestFailingRpcServer(String name,
TestFailingRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException { Configuration conf, RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, true); return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
}
static final class FailingConnection extends NettyServerRpcConnection {
private FailingConnection(TestFailingRpcServer rpcServer, Channel channel) {
super(rpcServer, channel);
}
@Override
public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}
@Override
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
return new NettyRpcServerPreambleHandler(TestFailingRpcServer.this) {
@Override
protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
return new FailingConnection(TestFailingRpcServer.this, channel);
}
};
}
}
@Override
protected RpcServer createTestFailingRpcServer(Server server, String name,
List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
Configuration conf, RpcScheduler scheduler) throws IOException {
return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler);
} }
} }

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
@ -26,6 +26,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.exceptions.SSLContextException; import org.apache.hadoop.hbase.exceptions.SSLContextException;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util; import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.After; import org.junit.After;

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.Security;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.AbstractTestIPC;
import org.apache.hadoop.hbase.ipc.FailingNettyRpcServer;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ RPCTests.class, MediumTests.class })
public class TestNettyTlsIPC extends AbstractTestIPC {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNettyTlsIPC.class);
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(CONF);
private static X509TestContextProvider PROVIDER;
private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;
private static HRegionServer SERVER;
@Parameterized.Parameter(0)
public X509KeyType caKeyType;
@Parameterized.Parameter(1)
public X509KeyType certKeyType;
@Parameterized.Parameter(2)
public String keyPassword;
@Parameterized.Parameter(3)
public boolean acceptPlainText;
@Parameterized.Parameter(4)
public boolean clientTlsEnabled;
private X509TestContext x509TestContext;
@Parameterized.Parameters(
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, acceptPlainText={3},"
+ " clientTlsEnabled={4}")
public static List<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
// do not accept plain text
params.add(new Object[] { caKeyType, certKeyType, keyPassword, false, true });
// support plain text and client enables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, true });
// support plain text and client disables tls
params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, false });
}
}
}
return params;
}
@BeforeClass
public static void setUpBeforeClass() throws IOException {
Security.addProvider(new BouncyCastleProvider());
File dir = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
.getCanonicalFile();
FileUtils.forceMkdir(dir);
// server must enable tls
CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
PROVIDER = new X509TestContextProvider(CONF, dir);
EVENT_LOOP_GROUP_CONFIG =
new NettyEventLoopGroupConfig(CONF, TestNettyTlsIPC.class.getSimpleName());
NettyRpcClientConfigHelper.setEventLoopConfig(CONF, EVENT_LOOP_GROUP_CONFIG.group(),
EVENT_LOOP_GROUP_CONFIG.clientChannelClass());
NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, EVENT_LOOP_GROUP_CONFIG.group(),
EVENT_LOOP_GROUP_CONFIG.clientChannelClass());
SERVER = mock(HRegionServer.class);
when(SERVER.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG);
}
@AfterClass
public static void tearDownAfterClass() throws InterruptedException {
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
EVENT_LOOP_GROUP_CONFIG.group().shutdownGracefully().sync();
UTIL.cleanupTestDir();
}
@Before
public void setUp() throws IOException {
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlainText);
CONF.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, clientTlsEnabled);
}
@After
public void tearDown() {
x509TestContext.clearConfigurations();
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
System.clearProperty("com.sun.net.ssl.checkRevocation");
System.clearProperty("com.sun.security.enableCRLDP");
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
}
@Override
protected RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {
return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true);
}
@Override
protected AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf) {
return new NettyRpcClient(conf) {
@Override
protected Codec getCodec() {
return null;
}
};
}
@Override
protected AbstractRpcClient<?> createRpcClient(Configuration conf) {
return new NettyRpcClient(conf);
}
@Override
protected AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(Configuration conf)
throws IOException {
return new NettyRpcClient(conf) {
@Override
protected boolean isTcpNoDelay() {
throw new RuntimeException("Injected fault");
}
};
}
@Override
protected RpcServer createTestFailingRpcServer(String name,
List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler) throws IOException {
return new FailingNettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler);
}
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThrows;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.Security;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
@RunWith(Parameterized.class)
@Category({ RPCTests.class, MediumTests.class })
public class TestNettyTlsIPCRejectPlainText {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNettyTlsIPCRejectPlainText.class);
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
private static File DIR;
private static X509TestContextProvider PROVIDER;
@Parameterized.Parameter(0)
public X509KeyType caKeyType;
@Parameterized.Parameter(1)
public X509KeyType certKeyType;
@Parameterized.Parameter(2)
public String keyPassword;
private X509TestContext x509TestContext;
private RpcServer rpcServer;
private RpcClient rpcClient;
@Parameterized.Parameters(name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}")
public static List<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
params.add(new Object[] { caKeyType, certKeyType, keyPassword });
}
}
}
return params;
}
@BeforeClass
public static void setUpBeforeClass() throws IOException {
Security.addProvider(new BouncyCastleProvider());
DIR = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
.getCanonicalFile();
FileUtils.forceMkdir(DIR);
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false);
PROVIDER = new X509TestContextProvider(conf, DIR);
}
@AfterClass
public static void tearDownAfterClass() {
Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
UTIL.cleanupTestDir();
}
@Before
public void setUp() throws IOException {
x509TestContext = PROVIDER.get(caKeyType, certKeyType, keyPassword);
x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
Configuration conf = UTIL.getConfiguration();
rpcServer = new NettyRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1), true);
rpcServer.start();
rpcClient = new NettyRpcClient(conf);
}
@After
public void tearDown() throws IOException {
if (rpcServer != null) {
rpcServer.stop();
}
Closeables.close(rpcClient, true);
x509TestContext.clearConfigurations();
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
System.clearProperty("com.sun.net.ssl.checkRevocation");
System.clearProperty("com.sun.security.enableCRLDP");
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
}
@Test
public void testReject() throws IOException, ServiceException {
BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress());
ServiceException se = assertThrows(ServiceException.class,
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello world").build()));
assertThat(se.getCause(), instanceOf(ConnectionClosedException.class));
}
}

View File

@ -1,209 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED;
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import java.net.InetSocketAddress;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.io.crypto.tls.BaseX509ParameterizedTestCase;
import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
@RunWith(Parameterized.class)
@Category({ SecurityTests.class, MediumTests.class })
public class TestTlsIPC extends BaseX509ParameterizedTestCase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTlsIPC.class);
@Parameterized.Parameter()
public X509KeyType caKeyType;
@Parameterized.Parameter(value = 1)
public X509KeyType certKeyType;
@Parameterized.Parameter(value = 2)
public String keyPassword;
@Parameterized.Parameter(value = 3)
public Integer paramIndex;
@Parameterized.Parameters(
name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, paramIndex={3}")
public static Collection<Object[]> data() {
List<Object[]> params = new ArrayList<>();
int paramIndex = 0;
for (X509KeyType caKeyType : X509KeyType.values()) {
for (X509KeyType certKeyType : X509KeyType.values()) {
for (String keyPassword : new String[] { KEY_EMPTY_PASSWORD, KEY_NON_EMPTY_PASSWORD }) {
params.add(new Object[] { caKeyType, certKeyType, keyPassword, paramIndex++ });
}
}
}
return params;
}
private static final String RPC_CLIENT_IMPL = NettyRpcClient.class.getName();
private static final String RPC_SERVER_IMPL = NettyRpcServer.class.getName();
private static final String HOST = "localhost";
private UserGroupInformation ugi;
private Configuration tlsConfiguration;
private Configuration clientConf;
private Configuration serverConf;
@Override
public void init(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword,
Integer paramIndex) throws Exception {
super.init(caKeyType, certKeyType, keyPassword, paramIndex);
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
tlsConfiguration = x509TestContext.getHbaseConf();
}
@Before
public void setUpTest() throws Exception {
init(caKeyType, certKeyType, keyPassword, paramIndex);
String clientusername = "testuser";
ugi =
UserGroupInformation.createUserForTesting(clientusername, new String[] { clientusername });
clientConf = HBaseConfiguration.create(tlsConfiguration);
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RPC_CLIENT_IMPL);
serverConf = HBaseConfiguration.create(tlsConfiguration);
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, RPC_SERVER_IMPL);
}
@After
public void cleanUp() {
x509TestContext.clearSystemProperties();
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_OCSP);
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_CLR);
x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
System.clearProperty("com.sun.net.ssl.checkRevocation");
System.clearProperty("com.sun.security.enableCRLDP");
Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
}
@Test
public void testNoPlaintext() throws Exception {
setTLSEncryption(true, false, true);
callRpcService(User.create(ugi));
}
@Test
public void testRejectPlaintext() {
setTLSEncryption(true, false, false);
Assert.assertThrows(ConnectionClosedException.class, () -> callRpcService(User.create(ugi)));
}
@Test
public void testAcceptPlaintext() throws Exception {
setTLSEncryption(true, true, false);
callRpcService(User.create(ugi));
}
void setTLSEncryption(Boolean server, Boolean acceptPlaintext, Boolean client) {
serverConf.set(HBASE_SERVER_NETTY_TLS_ENABLED, server.toString());
serverConf.set(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlaintext.toString());
clientConf.set(HBASE_CLIENT_NETTY_TLS_ENABLED, client.toString());
}
/**
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
* the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists
.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
isa, serverConf, new FifoRpcScheduler(serverConf, 1));
rpcServer.start();
try (RpcClient rpcClient =
RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
TestSecureIPC.TestThread th = new TestSecureIPC.TestThread(stub);
AtomicReference<Throwable> exception = new AtomicReference<>();
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread th, Throwable ex) {
exception.set(ex);
}
};
th.setUncaughtExceptionHandler(exceptionHandler);
th.start();
th.join();
if (exception.get() != null) {
// throw root cause.
while (exception.get().getCause() != null) {
exception.set(exception.get().getCause());
}
throw (Exception) exception.get();
}
} finally {
rpcServer.stop();
}
}
}

View File

@ -106,7 +106,7 @@ public class TestTlsWithKerberos {
.setTempDir(new File(TEST_UTIL.getDataTestDir().toUri().getPath())) .setTempDir(new File(TEST_UTIL.getDataTestDir().toUri().getPath()))
.setKeyStorePassword("Pa$$word").setKeyStoreKeyType(X509KeyType.RSA) .setKeyStorePassword("Pa$$word").setKeyStoreKeyType(X509KeyType.RSA)
.setTrustStoreKeyType(X509KeyType.RSA).setTrustStorePassword("Pa$$word").build(); .setTrustStoreKeyType(X509KeyType.RSA).setTrustStorePassword("Pa$$word").build();
x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS); x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
} }
@AfterClass @AfterClass
@ -123,10 +123,10 @@ public class TestTlsWithKerberos {
krbKeytab = getKeytabFileForTesting(); krbKeytab = getKeytabFileForTesting();
krbPrincipal = getPrincipalForTesting(); krbPrincipal = getPrincipalForTesting();
ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
clientConf = HBaseConfiguration.create(x509TestContext.getHbaseConf()); clientConf = HBaseConfiguration.create(x509TestContext.getConf());
setSecuredConfiguration(clientConf); setSecuredConfiguration(clientConf);
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RPC_CLIENT_IMPL); clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RPC_CLIENT_IMPL);
serverConf = HBaseConfiguration.create(x509TestContext.getHbaseConf()); serverConf = HBaseConfiguration.create(x509TestContext.getConf());
setSecuredConfiguration(serverConf); setSecuredConfiguration(serverConf);
serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, RPC_SERVER_IMPL); serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, RPC_SERVER_IMPL);
} }