diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java index fdec5f21bb..b526e25e6b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java @@ -39,7 +39,6 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.file.monitor.LastModifiedMonitor; import org.apache.nifi.util.file.monitor.SynchronousFileWatcher; -import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.nio.file.Paths; @@ -58,7 +57,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; @Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value", "record"}) -@CapabilityDescription("A reloadable CSV file-based lookup service. When the lookup key is found in the CSV file, the remaining columns are returned as a Record.") +@CapabilityDescription( + "A reloadable CSV file-based lookup service. When the lookup key is found in the CSV file, " + + "the columns are returned as a Record. All returned fields will be strings." +) public class CSVRecordLookupService extends AbstractControllerService implements RecordLookupService { private static final String KEY = "key"; @@ -69,15 +71,16 @@ public class CSVRecordLookupService extends AbstractControllerService implements new PropertyDescriptor.Builder() .name("csv-file") .displayName("CSV File") - .description("A CSV file.") + .description("A CSV file that will serve as the data source.") .required(true) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .expressionLanguageSupported(true) .build(); static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder() - .name("CSV Format") - .description("Specifies which \"format\" the CSV data is in, or specifies if custom formatting should be used.") + .name("csv-format") + .displayName("CSV Format") + .description("Specifies which \"format\" the CSV data is in.") .expressionLanguageSupported(false) .allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet())) .defaultValue(CSVFormat.Predefined.Default.toString()) @@ -88,7 +91,8 @@ public class CSVRecordLookupService extends AbstractControllerService implements new PropertyDescriptor.Builder() .name("lookup-key-column") .displayName("Lookup Key Column") - .description("Lookup key column.") + .description("The field in the CSV file that will serve as the lookup key. " + + "This is the field that will be matched against the property specified in the lookup processor.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -107,7 +111,7 @@ public class CSVRecordLookupService extends AbstractControllerService implements private List properties; - private volatile ConcurrentMap> cache; + private volatile ConcurrentMap cache; private volatile String csvFile; @@ -131,15 +135,16 @@ public class CSVRecordLookupService extends AbstractControllerService implements final FileReader reader = new FileReader(csvFile); final CSVParser records = csvFormat.withFirstRecordAsHeader().parse(reader); - this.cache = new ConcurrentHashMap<>(); + ConcurrentHashMap cache = new ConcurrentHashMap<>(); + RecordSchema lookupRecordSchema = null; for (final CSVRecord record : records) { final String key = record.get(lookupKeyColumn); if (StringUtils.isBlank(key)) { throw new IllegalStateException("Empty lookup key encountered in: " + csvFile); - } else if (!ignoreDuplicates && this.cache.containsKey(key)) { + } else if (!ignoreDuplicates && cache.containsKey(key)) { throw new IllegalStateException("Duplicate lookup key encountered: " + key + " in " + csvFile); - } else if (ignoreDuplicates && this.cache.containsKey(key)) { + } else if (ignoreDuplicates && cache.containsKey(key)) { logger.warn("Duplicate lookup key encountered: {} in {}", new Object[]{key, csvFile}); } @@ -150,9 +155,18 @@ public class CSVRecordLookupService extends AbstractControllerService implements properties.put(k, v); } }); - cache.put(key, properties); + + if (lookupRecordSchema == null) { + List recordFields = new ArrayList<>(properties.size()); + properties.forEach((k, v) -> recordFields.add(new RecordField(k, RecordFieldType.STRING.getDataType()))); + lookupRecordSchema = new SimpleRecordSchema(recordFields); + } + + cache.put(key, new MapRecord(lookupRecordSchema, properties)); } + this.cache = cache; + if (cache.isEmpty()) { logger.warn("Lookup table is empty after reading file: " + csvFile); } @@ -178,7 +192,7 @@ public class CSVRecordLookupService extends AbstractControllerService implements } @OnEnabled - public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, FileNotFoundException { + public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException { this.csvFile = context.getProperty(CSV_FILE).getValue(); this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat(); this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).getValue(); @@ -203,25 +217,14 @@ public class CSVRecordLookupService extends AbstractControllerService implements } try { - if (watcher != null && watcher.checkAndReset()) { + if (watcher.checkAndReset()) { loadCache(); } } catch (final IllegalStateException | IOException e) { throw new LookupFailureException(e.getMessage(), e); } - final Record lookupRecord; - Map recordMap = cache.get(key); - if (recordMap != null) { - List recordFields = new ArrayList<>(recordMap.size()); - recordMap.forEach((k, v) -> recordFields.add(new RecordField(k, RecordFieldType.STRING.getDataType()))); - final RecordSchema lookupRecordSchema = new SimpleRecordSchema(recordFields); - lookupRecord = new MapRecord(lookupRecordSchema, recordMap); - } else { - lookupRecord = null; - } - - return Optional.ofNullable(lookupRecord); + return Optional.ofNullable(cache.get(key)); } @Override