NIFI-454: Use random ports instead of specific ports for running unit tests; updated abstract class and interface to expose the port being used

This commit is contained in:
Mark Payne 2015-06-05 13:56:45 -04:00
parent 739baa2e57
commit 315af02c59
4 changed files with 184 additions and 181 deletions

View File

@ -52,7 +52,6 @@ public abstract class AbstractCacheServer implements CacheServer {
private final SSLContext sslContext; private final SSLContext sslContext;
protected volatile boolean stopped = false; protected volatile boolean stopped = false;
private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>(); private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();
;
private volatile ServerSocketChannel serverSocketChannel; private volatile ServerSocketChannel serverSocketChannel;
@ -62,6 +61,11 @@ public abstract class AbstractCacheServer implements CacheServer {
this.sslContext = sslContext; this.sslContext = sslContext;
} }
@Override
public int getPort() {
return serverSocketChannel == null ? this.port : serverSocketChannel.socket().getLocalPort();
}
@Override @Override
public void start() throws IOException { public void start() throws IOException {
serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel = ServerSocketChannel.open();
@ -117,7 +121,7 @@ public abstract class AbstractCacheServer implements CacheServer {
return; return;
} }
try (final InputStream in = new BufferedInputStream(rawInputStream); try (final InputStream in = new BufferedInputStream(rawInputStream);
final OutputStream out = new BufferedOutputStream(rawOutputStream)) { final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);

View File

@ -24,4 +24,6 @@ public interface CacheServer {
void stop() throws IOException; void stop() throws IOException;
int getPort();
} }

View File

@ -35,39 +35,39 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
public static final String EVICTION_STRATEGY_FIFO = "First In, First Out"; public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port") .name("Port")
.description("The port to listen on for incoming connections") .description("The port to listen on for incoming connections")
.required(true) .required(true)
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.defaultValue("4557") .defaultValue("4557")
.build(); .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service") .name("SSL Context Service")
.description("If specified, this service will be used to create an SSL Context that will be used " .description("If specified, this service will be used to create an SSL Context that will be used "
+ "to secure communications; if not specified, communications will not be secure") + "to secure communications; if not specified, communications will not be secure")
.required(false) .required(false)
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
.name("Maximum Cache Entries") .name("Maximum Cache Entries")
.description("The maximum number of cache entries that the cache can hold") .description("The maximum number of cache entries that the cache can hold")
.required(true) .required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000") .defaultValue("10000")
.build(); .build();
public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder() public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder()
.name("Eviction Strategy") .name("Eviction Strategy")
.description("Determines which strategy should be used to evict values from the cache to make room for new entries") .description("Determines which strategy should be used to evict values from the cache to make room for new entries")
.required(true) .required(true)
.allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO) .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO)
.defaultValue(EVICTION_STRATEGY_LFU) .defaultValue(EVICTION_STRATEGY_LFU)
.build(); .build();
public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder() public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder()
.name("Persistence Directory") .name("Persistence Directory")
.description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only") .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only")
.required(false) .required(false)
.addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
.build(); .build();
private volatile CacheServer cacheServer; private volatile CacheServer cacheServer;
@ -103,5 +103,12 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
shutdownServer(); shutdownServer();
} }
/**
* @return the port that the server is listening on, or -1 if the server has not been started
*/
public int getPort() {
return cacheServer == null ? -1 : cacheServer.getPort();
}
protected abstract CacheServer createCacheServer(ConfigurationContext context); protected abstract CacheServer createCacheServer(ConfigurationContext context);
} }

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.nifi.distributed.cache.server; package org.apache.nifi.distributed.cache.server;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -29,8 +27,11 @@ import java.io.OutputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
@ -38,16 +39,16 @@ import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService
import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer; import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockConfigurationContext; import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockControllerServiceInitializationContext; import org.apache.nifi.util.MockControllerServiceInitializationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.commons.lang3.SerializationException; import org.apache.nifi.util.TestRunners;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -65,20 +66,24 @@ public class TestServerAndClient {
LOGGER = LoggerFactory.getLogger(TestServerAndClient.class); LOGGER = LoggerFactory.getLogger(TestServerAndClient.class);
} }
@Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel")
@Test @Test
public void testNonPersistentSetServerAndClient() throws InitializationException, IOException { public void testNonPersistentSetServerAndClient() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server // Create server
final DistributedSetCacheServer server = new DistributedSetCacheServer(); final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); final DistributedSetCacheServer server = new SetServer();
server.initialize(serverInitContext); runner.addControllerService("server", server);
runner.enableControllerService(server);
final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); final DistributedSetCacheClientService client = createClient(server.getPort());
final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup());
server.startServer(serverContext);
final DistributedSetCacheClientService client = createClient();
final Serializer<String> serializer = new StringSerializer(); final Serializer<String> serializer = new StringSerializer();
final boolean added = client.addIfAbsent("test", serializer); final boolean added = client.addIfAbsent("test", serializer);
assertTrue(added); assertTrue(added);
@ -98,24 +103,28 @@ public class TestServerAndClient {
server.shutdownServer(); server.shutdownServer();
} }
@Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel")
@Test @Test
public void testPersistentSetServerAndClient() throws InitializationException, IOException { public void testPersistentSetServerAndClient() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server
final DistributedSetCacheServer server = new DistributedSetCacheServer();
MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server");
server.initialize(serverInitContext);
final File dataFile = new File("target/cache-data"); final File dataFile = new File("target/cache-data");
deleteRecursively(dataFile); deleteRecursively(dataFile);
final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); // Create server
serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); final DistributedSetCacheServer server = new SetServer();
server.startServer(serverContext); runner.addControllerService("server", server);
runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
runner.enableControllerService(server);
final DistributedSetCacheClientService client = createClient(); DistributedSetCacheClientService client = createClient(server.getPort());
final Serializer<String> serializer = new StringSerializer(); final Serializer<String> serializer = new StringSerializer();
final boolean added = client.addIfAbsent("test", serializer); final boolean added = client.addIfAbsent("test", serializer);
final boolean added2 = client.addIfAbsent("test2", serializer); final boolean added2 = client.addIfAbsent("test2", serializer);
@ -137,41 +146,45 @@ public class TestServerAndClient {
assertFalse(containedAfterRemove); assertFalse(containedAfterRemove);
server.shutdownServer(); server.shutdownServer();
client.close();
final DistributedSetCacheServer newServer = new DistributedSetCacheServer(); final DistributedSetCacheServer newServer = new SetServer();
MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer, "server2"); runner.addControllerService("server2", newServer);
newServer.initialize(newServerInitContext); runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
runner.enableControllerService(newServer);
final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties, client = createClient(newServer.getPort());
newServerInitContext.getControllerServiceLookup());
newServer.startServer(newServerContext);
assertFalse(client.contains("test", serializer)); assertFalse(client.contains("test", serializer));
assertTrue(client.contains("test2", serializer)); assertTrue(client.contains("test2", serializer));
newServer.shutdownServer(); newServer.shutdownServer();
client.close();
} }
@Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel")
@Test @Test
public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException { public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server // Create server
final DistributedSetCacheServer server = new DistributedSetCacheServer();
MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server");
server.initialize(serverInitContext);
final File dataFile = new File("target/cache-data"); final File dataFile = new File("target/cache-data");
deleteRecursively(dataFile); deleteRecursively(dataFile);
final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); // Create server
serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
serverProperties.put(DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); final DistributedSetCacheServer server = new SetServer();
runner.addControllerService("server", server);
runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU);
runner.enableControllerService(server);
final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); DistributedSetCacheClientService client = createClient(server.getPort());
server.startServer(serverContext);
final DistributedSetCacheClientService client = createClient();
final Serializer<String> serializer = new StringSerializer(); final Serializer<String> serializer = new StringSerializer();
final boolean added = client.addIfAbsent("test", serializer); final boolean added = client.addIfAbsent("test", serializer);
waitABit(); waitABit();
@ -199,13 +212,13 @@ public class TestServerAndClient {
server.shutdownServer(); server.shutdownServer();
final DistributedSetCacheServer newServer = new DistributedSetCacheServer();
MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer, "server2");
newServer.initialize(newServerInitContext);
final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties, final DistributedSetCacheServer newServer = new SetServer();
newServerInitContext.getControllerServiceLookup()); runner.addControllerService("server2", newServer);
newServer.startServer(newServerContext); runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
runner.enableControllerService(newServer);
client.close();
client = createClient(newServer.getPort());
assertTrue(client.contains("test", serializer)); assertTrue(client.contains("test", serializer));
assertTrue(client.contains("test2", serializer)); assertTrue(client.contains("test2", serializer));
@ -213,29 +226,33 @@ public class TestServerAndClient {
assertTrue(client.contains("test4", serializer)); assertTrue(client.contains("test4", serializer));
newServer.shutdownServer(); newServer.shutdownServer();
client.close();
} }
@Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel")
@Test @Test
public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException { public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server
final DistributedSetCacheServer server = new DistributedSetCacheServer();
MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server");
server.initialize(serverInitContext);
final File dataFile = new File("target/cache-data"); final File dataFile = new File("target/cache-data");
deleteRecursively(dataFile); deleteRecursively(dataFile);
final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); // Create server
serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
serverProperties.put(DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); final DistributedSetCacheServer server = new SetServer();
serverProperties.put(DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO); runner.addControllerService("server", server);
runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
runner.enableControllerService(server);
final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); DistributedSetCacheClientService client = createClient(server.getPort());
server.startServer(serverContext);
final DistributedSetCacheClientService client = createClient();
final Serializer<String> serializer = new StringSerializer(); final Serializer<String> serializer = new StringSerializer();
// add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond // add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond
@ -267,35 +284,42 @@ public class TestServerAndClient {
assertTrue(client.contains("test3", serializer)); assertTrue(client.contains("test3", serializer));
server.shutdownServer(); server.shutdownServer();
client.close();
final DistributedSetCacheServer newServer = new DistributedSetCacheServer();
MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer, "server2");
newServer.initialize(newServerInitContext);
final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties, final DistributedSetCacheServer newServer = new SetServer();
newServerInitContext.getControllerServiceLookup()); runner.addControllerService("server2", newServer);
newServer.startServer(newServerContext); runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
runner.setProperty(newServer, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
runner.setProperty(newServer, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
runner.enableControllerService(newServer);
client = createClient(newServer.getPort());
assertFalse(client.contains("test", serializer)); assertFalse(client.contains("test", serializer));
assertTrue(client.contains("test2", serializer)); assertTrue(client.contains("test2", serializer));
assertTrue(client.contains("test3", serializer)); assertTrue(client.contains("test3", serializer));
assertTrue(client.contains("test4", serializer)); assertTrue(client.contains("test4", serializer));
newServer.shutdownServer(); newServer.shutdownServer();
client.close();
} }
@Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel")
@Test @Test
public void testNonPersistentMapServerAndClient() throws InitializationException, IOException, InterruptedException { public void testNonPersistentMapServerAndClient() throws InitializationException, IOException, InterruptedException {
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); /**
// Create server * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
final DistributedMapCacheServer server = new DistributedMapCacheServer(); * See: https://issues.apache.org/jira/browse/NIFI-437
MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); */
server.initialize(serverInitContext); Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup());
server.startServer(serverContext); // Create server
final DistributedMapCacheServer server = new MapServer();
final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
runner.addControllerService("server", server);
runner.enableControllerService(server);
DistributedMapCacheClientService client = new DistributedMapCacheClientService(); DistributedMapCacheClientService client = new DistributedMapCacheClientService();
MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
@ -303,6 +327,7 @@ public class TestServerAndClient {
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
client.cacheConfig(clientContext); client.cacheConfig(clientContext);
@ -338,7 +363,7 @@ public class TestServerAndClient {
try { try {
client.containsKey("testKey", keySerializer); client.containsKey("testKey", keySerializer);
fail("Should be closed and not accessible"); fail("Should be closed and not accessible");
} catch (Exception e) { } catch (final Exception e) {
} }
client = null; client = null;
@ -346,12 +371,11 @@ public class TestServerAndClient {
clientContext = null; clientContext = null;
DistributedMapCacheClientService client2 = new DistributedMapCacheClientService(); DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2"); MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
client2.initialize(clientInitContext2); client2.initialize(clientInitContext2);
MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties,
clientInitContext2.getControllerServiceLookup()); clientInitContext2.getControllerServiceLookup());
client2.cacheConfig(clientContext2); client2.cacheConfig(clientContext2);
assertFalse(client2.putIfAbsent("testKey", "test", keySerializer, valueSerializer)); assertFalse(client2.putIfAbsent("testKey", "test", keySerializer, valueSerializer));
assertTrue(client2.containsKey("testKey", keySerializer)); assertTrue(client2.containsKey("testKey", keySerializer));
@ -360,13 +384,11 @@ public class TestServerAndClient {
try { try {
client2.containsKey("testKey", keySerializer); client2.containsKey("testKey", keySerializer);
fail("Should have blown exception!"); fail("Should have blown exception!");
} catch (ConnectException e) { } catch (final ConnectException e) {
client2 = null; client2 = null;
clientContext2 = null; clientContext2 = null;
clientInitContext2 = null; clientInitContext2 = null;
} }
Thread.sleep(2000);
System.gc();
LOGGER.debug("end testNonPersistentMapServerAndClient"); LOGGER.debug("end testNonPersistentMapServerAndClient");
} }
@ -377,12 +399,12 @@ public class TestServerAndClient {
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug See: https://issues.apache.org/jira/browse/NIFI-437 * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug See: https://issues.apache.org/jira/browse/NIFI-437
*/ */
Assume.assumeFalse("testClientTermination is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", Assume.assumeFalse("testClientTermination is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server // Create server
final DistributedMapCacheServer server = new DistributedMapCacheServer(); final DistributedMapCacheServer server = new MapServer();
MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server");
server.initialize(serverInitContext); server.initialize(serverInitContext);
final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
@ -428,65 +450,6 @@ public class TestServerAndClient {
server.shutdownServer(); server.shutdownServer();
} }
@Ignore
@Test
public void testSSLWith2RequestsWithServerTimeout() throws InitializationException, IOException, InterruptedException {
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create SSLContext Service
final StandardSSLContextService sslService = new StandardSSLContextService();
final MockControllerServiceInitializationContext sslServerInitContext = new MockControllerServiceInitializationContext(sslService,
"ssl-context");
sslService.initialize(sslServerInitContext);
final Map<PropertyDescriptor, String> sslServerProps = new HashMap<>();
sslServerProps.put(StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
sslServerProps.put(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
sslServerProps.put(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
sslServerProps.put(StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
sslServerProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
sslServerProps.put(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
MockConfigurationContext sslServerContext = new MockConfigurationContext(sslServerProps, sslServerInitContext);
sslService.onConfigured(sslServerContext);
sslService.createSSLContext(ClientAuth.REQUIRED);
// Create server
final DistributedMapCacheServer server = new DistributedMapCacheServer();
final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server");
server.initialize(serverInitContext);
final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
serverProperties.put(DistributedMapCacheServer.SSL_CONTEXT_SERVICE, "ssl-context");
final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup());
server.startServer(serverContext);
DistributedMapCacheClientService client = new DistributedMapCacheClientService();
MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
client.initialize(clientInitContext);
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
clientProperties.put(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE, "ssl-context");
MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
client.cacheConfig(clientContext);
final Serializer<String> valueSerializer = new StringSerializer();
final Serializer<String> keySerializer = new StringSerializer();
final Deserializer<String> deserializer = new StringDeserializer();
final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
assertEquals(null, original);
Thread.sleep(30000);
try {
final boolean contains = client.containsKey("testKey", keySerializer);
assertTrue(contains);
} catch (IOException e) {
// this is due to the server timing out in the middle of this request
assertTrue(e.getMessage().contains("Channel is closed"));
}
server.shutdownServer();
}
private void waitABit() { private void waitABit() {
try { try {
Thread.sleep(10L); Thread.sleep(10L);
@ -494,13 +457,14 @@ public class TestServerAndClient {
} }
} }
private DistributedSetCacheClientService createClient() throws InitializationException { private DistributedSetCacheClientService createClient(final int port) throws InitializationException {
final DistributedSetCacheClientService client = new DistributedSetCacheClientService(); final DistributedSetCacheClientService client = new DistributedSetCacheClientService();
MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
client.initialize(clientInitContext); client.initialize(clientInitContext);
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost"); clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost");
clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port));
final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
client.onConfigured(clientContext); client.onConfigured(clientContext);
@ -519,7 +483,7 @@ public class TestServerAndClient {
@Override @Override
public String deserialize(final byte[] input) throws DeserializationException, IOException { public String deserialize(final byte[] input) throws DeserializationException, IOException {
return (input.length == 0) ? null : new String(input, StandardCharsets.UTF_8); return input.length == 0 ? null : new String(input, StandardCharsets.UTF_8);
} }
} }
@ -543,4 +507,30 @@ public class TestServerAndClient {
} }
} }
} }
private static List<PropertyDescriptor> replacePortDescriptor(final List<PropertyDescriptor> descriptors) {
descriptors.remove(DistributedCacheServer.PORT);
descriptors.add(new PropertyDescriptor.Builder()
.name("Port")
.description("The port to listen on for incoming connections")
.required(true)
.addValidator(StandardValidators.createLongValidator(0L, 65535L, true))
.defaultValue("0")
.build());
return descriptors;
}
private static class SetServer extends DistributedSetCacheServer {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return replacePortDescriptor(super.getSupportedPropertyDescriptors());
}
}
private static class MapServer extends DistributedMapCacheServer {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return replacePortDescriptor(super.getSupportedPropertyDescriptors());
}
}
} }