NIFI-9200: Free cache on heap after disabling AbstractCSVLookupService

This closes #5372.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Lehel Boér 2021-09-08 12:33:52 +02:00 committed by Peter Turcsanyi
parent 6e059ef7c6
commit d96398feb8
4 changed files with 91 additions and 36 deletions

View File

@ -21,6 +21,7 @@ import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
@ -138,7 +139,7 @@ public class CSVRecordLookupService extends AbstractCSVLookupService implements
return Optional.empty();
}
final String key = (String)coordinates.get(KEY);
final String key = (String) coordinates.get(KEY);
if (StringUtils.isBlank(key)) {
return Optional.empty();
}
@ -159,4 +160,14 @@ public class CSVRecordLookupService extends AbstractCSVLookupService implements
return REQUIRED_KEYS;
}
@OnDisabled
public void onDisabled() {
cache = null;
}
// VisibleForTesting
boolean isCaching() {
return cache != null;
}
}

View File

@ -16,6 +16,20 @@
*/
package org.apache.nifi.lookup;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -30,20 +44,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value"})
@CapabilityDescription("A reloadable CSV file-based lookup service. The first line of the csv file is considered as " +
"header.")
@ -111,7 +111,7 @@ public class SimpleCsvFileLookupService extends AbstractCSVLookupService impleme
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws IOException, InitializationException {
public void onEnabled(final ConfigurationContext context) throws IOException, InitializationException {
super.onEnabled(context);
this.lookupValueColumn = context.getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions().getValue();
try {
@ -148,4 +148,13 @@ public class SimpleCsvFileLookupService extends AbstractCSVLookupService impleme
return REQUIRED_KEYS;
}
@OnDisabled
public void onDisabled() {
cache = null;
}
// VisibleForTesting
boolean isCaching() {
return cache != null;
}
}

View File

@ -16,34 +16,34 @@
*/
package org.apache.nifi.lookup;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class TestCSVRecordLookupService {
private final static Optional<Record> EMPTY_RECORD = Optional.empty();
@Test
public void testSimpleCsvFileLookupService() throws InitializationException, IOException, LookupFailureException {
public void testSimpleCsvRecordLookupService() throws InitializationException, IOException, LookupFailureException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final CSVRecordLookupService service = new CSVRecordLookupService();
runner.addControllerService("csv-file-lookup-service", service);
runner.addControllerService("csv-record-lookup-service", service);
runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test.csv");
runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, "key");
@ -53,7 +53,7 @@ public class TestCSVRecordLookupService {
final CSVRecordLookupService lookupService =
(CSVRecordLookupService) runner.getProcessContext()
.getControllerServiceLookup()
.getControllerService("csv-file-lookup-service");
.getControllerService("csv-record-lookup-service");
assertThat(lookupService, instanceOf(LookupService.class));
@ -70,11 +70,11 @@ public class TestCSVRecordLookupService {
}
@Test
public void testSimpleCsvFileLookupServiceWithCharset() throws InitializationException, IOException, LookupFailureException {
public void testSimpleCsvRecordLookupServiceWithCharset() throws InitializationException, LookupFailureException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final CSVRecordLookupService service = new CSVRecordLookupService();
runner.addControllerService("csv-file-lookup-service", service);
runner.addControllerService("csv-record-lookup-service", service);
runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test_Windows-31J.csv");
runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
runner.setProperty(service, CSVRecordLookupService.CHARSET, "Windows-31J");
@ -111,5 +111,21 @@ public class TestCSVRecordLookupService {
assertEquals("my_value with an escaped |.", my_key.get().getAsString("value"));
}
@Test
public void testCacheIsClearedWhenDisableService() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final CSVRecordLookupService service = new CSVRecordLookupService();
runner.addControllerService("csv-record-lookup-service", service);
runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test.csv");
runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, "key");
runner.enableControllerService(service);
runner.assertValid(service);
assertTrue(service.isCaching());
runner.disableControllerService(service);
assertFalse(service.isCaching());
}
}

View File

@ -16,28 +16,29 @@
*/
package org.apache.nifi.lookup;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class TestSimpleCsvFileLookupService {
final static Optional<String> EMPTY_STRING = Optional.empty();
@Test
public void testSimpleCsvFileLookupService() throws InitializationException, IOException, LookupFailureException {
public void testSimpleCsvFileLookupService() throws InitializationException, LookupFailureException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final SimpleCsvFileLookupService service = new SimpleCsvFileLookupService();
@ -50,9 +51,9 @@ public class TestSimpleCsvFileLookupService {
runner.assertValid(service);
final SimpleCsvFileLookupService lookupService =
(SimpleCsvFileLookupService) runner.getProcessContext()
.getControllerServiceLookup()
.getControllerService("csv-file-lookup-service");
(SimpleCsvFileLookupService) runner.getProcessContext()
.getControllerServiceLookup()
.getControllerService("csv-file-lookup-service");
assertThat(lookupService, instanceOf(LookupService.class));
@ -106,4 +107,22 @@ public class TestSimpleCsvFileLookupService {
final Optional<String> value = service.lookup(Collections.singletonMap("key", "my_key"));
assertEquals(Optional.of("my_value with an escaped |."), value);
}
@Test
public void testCacheIsClearedWhenDisableService() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final CSVRecordLookupService service = new CSVRecordLookupService();
runner.addControllerService("csv-file-lookup-service", service);
runner.setProperty(service, CSVRecordLookupService.CSV_FILE, "src/test/resources/test.csv");
runner.setProperty(service, CSVRecordLookupService.CSV_FORMAT, "RFC4180");
runner.setProperty(service, CSVRecordLookupService.LOOKUP_KEY_COLUMN, "key");
runner.enableControllerService(service);
runner.assertValid(service);
assertTrue(service.isCaching());
runner.disableControllerService(service);
assertFalse(service.isCaching());
}
}