NIFI-3582: Fixing PersistentMapCache bug that skipped persistent evictions

This closes #1592.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Joe Gresock 2017-03-13 16:01:37 +00:00 committed by Bryan Bende
parent 9d4239be1e
commit bd91390105
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 82 additions and 2 deletions

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -80,7 +79,7 @@ public class PersistentMapCache implements MapCache {
records.add(new MapWaliRecord(UpdateType.DELETE, evicted.getKey(), evicted.getValue()));
}
wali.update(Collections.singletonList(record), false);
wali.update(records, false);
final long modCount = modifications.getAndIncrement();
if ( modCount > 0 && modCount % 100000 == 0 ) {

View File

@ -235,6 +235,73 @@ public class TestServerAndClient {
client.close();
}
@Test
public void testPersistentMapServerAndClientWithLFUEvictions() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server
final File dataFile = new File("target/cache-data");
deleteRecursively(dataFile);
// Create server
final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
final DistributedMapCacheServer server = new MapServer();
runner.addControllerService("server", server);
runner.setProperty(server, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
runner.setProperty(server, DistributedMapCacheServer.MAX_CACHE_ENTRIES, "3");
runner.setProperty(server, DistributedMapCacheServer.EVICTION_POLICY, DistributedMapCacheServer.EVICTION_STRATEGY_LFU);
runner.enableControllerService(server);
DistributedMapCacheClientService client = createMapClient(server.getPort());
final Serializer<String> serializer = new StringSerializer();
final boolean added = client.putIfAbsent("test", "1", serializer, serializer);
waitABit();
final boolean added2 = client.putIfAbsent("test2", "2", serializer, serializer);
waitABit();
final boolean added3 = client.putIfAbsent("test3", "3", serializer, serializer);
waitABit();
assertTrue(added);
assertTrue(added2);
assertTrue(added3);
final boolean contains = client.containsKey("test", serializer);
final boolean contains2 = client.containsKey("test2", serializer);
assertTrue(contains);
assertTrue(contains2);
final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer);
assertFalse(addedAgain);
final boolean added4 = client.putIfAbsent("test4", "4", serializer, serializer);
assertTrue(added4);
// ensure that added3 was evicted because it was used least frequently
assertFalse(client.containsKey("test3", serializer));
server.shutdownServer();
final DistributedMapCacheServer newServer = new MapServer();
runner.addControllerService("server2", newServer);
runner.setProperty(newServer, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
runner.enableControllerService(newServer);
client.close();
client = createMapClient(newServer.getPort());
assertTrue(client.containsKey("test", serializer));
assertTrue(client.containsKey("test2", serializer));
assertFalse(client.containsKey("test3", serializer));
assertTrue(client.containsKey("test4", serializer));
newServer.shutdownServer();
client.close();
}
@Test
public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
/**
@ -628,6 +695,20 @@ public class TestServerAndClient {
return client;
}
private DistributedMapCacheClientService createMapClient(final int port) throws InitializationException {
final DistributedMapCacheClientService client = new DistributedMapCacheClientService();
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
client.initialize(clientInitContext);
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port));
final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
client.cacheConfig(clientContext);
return client;
}
private static class StringSerializer implements Serializer<String> {
@Override