diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index 62deae584d..da457bdc92 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,7 +79,7 @@ public class PersistentMapCache implements MapCache { records.add(new MapWaliRecord(UpdateType.DELETE, evicted.getKey(), evicted.getValue())); } - wali.update(Collections.singletonList(record), false); + wali.update(records, false); final long modCount = modifications.getAndIncrement(); if ( modCount > 0 && modCount % 100000 == 0 ) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java index c8f94788fb..0f5675c53a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java @@ -235,6 +235,73 @@ public class TestServerAndClient { client.close(); } + @Test + public void testPersistentMapServerAndClientWithLFUEvictions() throws InitializationException, IOException { + /** + * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug + * See: https://issues.apache.org/jira/browse/NIFI-437 + */ + Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", + SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); + + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); + // Create server + final File dataFile = new File("target/cache-data"); + deleteRecursively(dataFile); + + // Create server + final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + final DistributedMapCacheServer server = new MapServer(); + runner.addControllerService("server", server); + runner.setProperty(server, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.setProperty(server, DistributedMapCacheServer.MAX_CACHE_ENTRIES, "3"); + runner.setProperty(server, DistributedMapCacheServer.EVICTION_POLICY, DistributedMapCacheServer.EVICTION_STRATEGY_LFU); + runner.enableControllerService(server); + + DistributedMapCacheClientService client = createMapClient(server.getPort()); + final Serializer serializer = new StringSerializer(); + final boolean added = client.putIfAbsent("test", "1", serializer, serializer); + waitABit(); + final boolean added2 = client.putIfAbsent("test2", "2", serializer, serializer); + waitABit(); + final boolean added3 = client.putIfAbsent("test3", "3", serializer, serializer); + waitABit(); + assertTrue(added); + assertTrue(added2); + assertTrue(added3); + + final boolean contains = client.containsKey("test", serializer); + final boolean contains2 = client.containsKey("test2", serializer); + assertTrue(contains); + assertTrue(contains2); + + final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer); + assertFalse(addedAgain); + + final boolean added4 = client.putIfAbsent("test4", "4", serializer, serializer); + assertTrue(added4); + + // ensure that added3 was evicted because it was used least frequently + assertFalse(client.containsKey("test3", serializer)); + + server.shutdownServer(); + + final DistributedMapCacheServer newServer = new MapServer(); + runner.addControllerService("server2", newServer); + runner.setProperty(newServer, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.enableControllerService(newServer); + client.close(); + client = createMapClient(newServer.getPort()); + + assertTrue(client.containsKey("test", serializer)); + assertTrue(client.containsKey("test2", serializer)); + assertFalse(client.containsKey("test3", serializer)); + assertTrue(client.containsKey("test4", serializer)); + + newServer.shutdownServer(); + client.close(); + } + @Test public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException { /** @@ -628,6 +695,20 @@ public class TestServerAndClient { return client; } + private DistributedMapCacheClientService createMapClient(final int port) throws InitializationException { + final DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); + client.initialize(clientInitContext); + + final Map clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port)); + final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); + client.cacheConfig(clientContext); + + return client; + } + private static class StringSerializer implements Serializer { @Override