NIFI-3970 added changes from a code review.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2334
This commit is contained in:
Mike Thomsen 2017-12-01 08:45:39 -05:00 committed by Matthew Burgess
parent 63840377dd
commit bfe92b9000
1 changed files with 28 additions and 25 deletions

View File

@ -39,7 +39,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.file.monitor.LastModifiedMonitor; import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher; import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
import java.io.FileNotFoundException;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Paths; import java.nio.file.Paths;
@ -58,7 +57,10 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value", "record"}) @Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value", "record"})
@CapabilityDescription("A reloadable CSV file-based lookup service. When the lookup key is found in the CSV file, the remaining columns are returned as a Record.") @CapabilityDescription(
"A reloadable CSV file-based lookup service. When the lookup key is found in the CSV file, " +
"the columns are returned as a Record. All returned fields will be strings."
)
public class CSVRecordLookupService extends AbstractControllerService implements RecordLookupService { public class CSVRecordLookupService extends AbstractControllerService implements RecordLookupService {
private static final String KEY = "key"; private static final String KEY = "key";
@ -69,15 +71,16 @@ public class CSVRecordLookupService extends AbstractControllerService implements
new PropertyDescriptor.Builder() new PropertyDescriptor.Builder()
.name("csv-file") .name("csv-file")
.displayName("CSV File") .displayName("CSV File")
.description("A CSV file.") .description("A CSV file that will serve as the data source.")
.required(true) .required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder() static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
.name("CSV Format") .name("csv-format")
.description("Specifies which \"format\" the CSV data is in, or specifies if custom formatting should be used.") .displayName("CSV Format")
.description("Specifies which \"format\" the CSV data is in.")
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet())) .allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet()))
.defaultValue(CSVFormat.Predefined.Default.toString()) .defaultValue(CSVFormat.Predefined.Default.toString())
@ -88,7 +91,8 @@ public class CSVRecordLookupService extends AbstractControllerService implements
new PropertyDescriptor.Builder() new PropertyDescriptor.Builder()
.name("lookup-key-column") .name("lookup-key-column")
.displayName("Lookup Key Column") .displayName("Lookup Key Column")
.description("Lookup key column.") .description("The field in the CSV file that will serve as the lookup key. " +
"This is the field that will be matched against the property specified in the lookup processor.")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
@ -107,7 +111,7 @@ public class CSVRecordLookupService extends AbstractControllerService implements
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private volatile ConcurrentMap<String, Map<String, Object>> cache; private volatile ConcurrentMap<String, Record> cache;
private volatile String csvFile; private volatile String csvFile;
@ -131,15 +135,16 @@ public class CSVRecordLookupService extends AbstractControllerService implements
final FileReader reader = new FileReader(csvFile); final FileReader reader = new FileReader(csvFile);
final CSVParser records = csvFormat.withFirstRecordAsHeader().parse(reader); final CSVParser records = csvFormat.withFirstRecordAsHeader().parse(reader);
this.cache = new ConcurrentHashMap<>(); ConcurrentHashMap<String, Record> cache = new ConcurrentHashMap<>();
RecordSchema lookupRecordSchema = null;
for (final CSVRecord record : records) { for (final CSVRecord record : records) {
final String key = record.get(lookupKeyColumn); final String key = record.get(lookupKeyColumn);
if (StringUtils.isBlank(key)) { if (StringUtils.isBlank(key)) {
throw new IllegalStateException("Empty lookup key encountered in: " + csvFile); throw new IllegalStateException("Empty lookup key encountered in: " + csvFile);
} else if (!ignoreDuplicates && this.cache.containsKey(key)) { } else if (!ignoreDuplicates && cache.containsKey(key)) {
throw new IllegalStateException("Duplicate lookup key encountered: " + key + " in " + csvFile); throw new IllegalStateException("Duplicate lookup key encountered: " + key + " in " + csvFile);
} else if (ignoreDuplicates && this.cache.containsKey(key)) { } else if (ignoreDuplicates && cache.containsKey(key)) {
logger.warn("Duplicate lookup key encountered: {} in {}", new Object[]{key, csvFile}); logger.warn("Duplicate lookup key encountered: {} in {}", new Object[]{key, csvFile});
} }
@ -150,9 +155,18 @@ public class CSVRecordLookupService extends AbstractControllerService implements
properties.put(k, v); properties.put(k, v);
} }
}); });
cache.put(key, properties);
if (lookupRecordSchema == null) {
List<RecordField> recordFields = new ArrayList<>(properties.size());
properties.forEach((k, v) -> recordFields.add(new RecordField(k, RecordFieldType.STRING.getDataType())));
lookupRecordSchema = new SimpleRecordSchema(recordFields);
} }
cache.put(key, new MapRecord(lookupRecordSchema, properties));
}
this.cache = cache;
if (cache.isEmpty()) { if (cache.isEmpty()) {
logger.warn("Lookup table is empty after reading file: " + csvFile); logger.warn("Lookup table is empty after reading file: " + csvFile);
} }
@ -178,7 +192,7 @@ public class CSVRecordLookupService extends AbstractControllerService implements
} }
@OnEnabled @OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, FileNotFoundException { public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException {
this.csvFile = context.getProperty(CSV_FILE).getValue(); this.csvFile = context.getProperty(CSV_FILE).getValue();
this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat(); this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).getValue(); this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).getValue();
@ -203,25 +217,14 @@ public class CSVRecordLookupService extends AbstractControllerService implements
} }
try { try {
if (watcher != null && watcher.checkAndReset()) { if (watcher.checkAndReset()) {
loadCache(); loadCache();
} }
} catch (final IllegalStateException | IOException e) { } catch (final IllegalStateException | IOException e) {
throw new LookupFailureException(e.getMessage(), e); throw new LookupFailureException(e.getMessage(), e);
} }
final Record lookupRecord; return Optional.ofNullable(cache.get(key));
Map<String, Object> recordMap = cache.get(key);
if (recordMap != null) {
List<RecordField> recordFields = new ArrayList<>(recordMap.size());
recordMap.forEach((k, v) -> recordFields.add(new RecordField(k, RecordFieldType.STRING.getDataType())));
final RecordSchema lookupRecordSchema = new SimpleRecordSchema(recordFields);
lookupRecord = new MapRecord(lookupRecordSchema, recordMap);
} else {
lookupRecord = null;
}
return Optional.ofNullable(lookupRecord);
} }
@Override @Override