mirror of
https://github.com/apache/nifi.git
synced 2025-03-06 09:29:33 +00:00
NIFI-7770 Add features to csv lookup services
Add the following functionalities: - Custom value separator (default is comma) - Custom quote char to use (default is " i.e. quote sign) - Quote mode - Escape character to use (default is no escape character) - Comment marker - Trim fields - Character set to use The above features use a common implementation with "CSVReader". Also append a sentence to the capability description that first line of csv file is considered header. Setting custom header instead of using the first line is not supported (yet). Also, a minor refactor: CSVRecordLoopkupService and SimpleCsvFileLookupService now share common logic in implementation. CSV Format is extended to the same list as CSVReader. In addition, lookup services still have the "default" csv format for compatibility reasons. Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4494.
This commit is contained in:
parent
44738b72cd
commit
73b7ff8fd4
@ -186,6 +186,7 @@
|
||||
<exclude>src/test/resources/complex.avsc</exclude>
|
||||
<exclude>src/test/resources/simple.avsc</exclude>
|
||||
<exclude>src/test/resources/test.csv</exclude>
|
||||
<exclude>src/test/resources/test_sep_escape_comment.csv</exclude>
|
||||
<exclude>src/test/resources/test_Windows-31J.csv</exclude>
|
||||
<exclude>src/test/resources/test.properties</exclude>
|
||||
<exclude>src/test/resources/test.xml</exclude>
|
||||
|
@ -0,0 +1,150 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.csv.CSVUtils;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
|
||||
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
|
||||
|
||||
public abstract class AbstractCSVLookupService extends AbstractControllerService {
|
||||
|
||||
protected static final String KEY = "key";
|
||||
public static final AllowableValue RFC4180 = new AllowableValue("RFC4180", "RFC4180",
|
||||
"Same as RFC 4180. Available for compatibility reasons.");
|
||||
public static final AllowableValue DEFAULT = new AllowableValue("default", "Default Format",
|
||||
"Same as custom format. Available for compatibility reasons.");
|
||||
|
||||
public static final PropertyDescriptor CSV_FILE =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("csv-file")
|
||||
.displayName("CSV File")
|
||||
.description("Path to a CSV File in which the key value pairs can be looked up.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CHARSET =
|
||||
new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(CSVUtils.CHARSET)
|
||||
.name("Character Set")
|
||||
.description("The Character Encoding that is used to decode the CSV file.")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CSV_FORMAT =
|
||||
new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(CSVUtils.CSV_FORMAT)
|
||||
.allowableValues(Stream.concat(
|
||||
CSVUtils.CSV_FORMAT.getAllowableValues().stream(),
|
||||
Stream.of(DEFAULT, RFC4180)).toArray(AllowableValue[]::new))
|
||||
.defaultValue(DEFAULT.getValue())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("lookup-key-column")
|
||||
.displayName("Lookup Key Column")
|
||||
.description("The field in the CSV file that will serve as the lookup key. " +
|
||||
"This is the field that will be matched against the property specified in the lookup processor.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor IGNORE_DUPLICATES =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("ignore-duplicates")
|
||||
.displayName("Ignore Duplicates")
|
||||
.description("Ignore duplicate keys for records in the CSV file.")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
protected List<PropertyDescriptor> properties;
|
||||
|
||||
protected volatile String csvFile;
|
||||
|
||||
protected volatile CSVFormat csvFormat;
|
||||
|
||||
protected volatile String charset;
|
||||
|
||||
protected volatile String lookupKeyColumn;
|
||||
|
||||
protected volatile boolean ignoreDuplicates;
|
||||
|
||||
protected volatile SynchronousFileWatcher watcher;
|
||||
|
||||
protected final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
protected abstract void loadCache() throws IllegalStateException, IOException;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(final ControllerServiceInitializationContext context) {
|
||||
this.properties = new ArrayList<>();
|
||||
properties.add(CSV_FILE);
|
||||
properties.add(CSV_FORMAT);
|
||||
properties.add(CHARSET);
|
||||
properties.add(LOOKUP_KEY_COLUMN);
|
||||
properties.add(IGNORE_DUPLICATES);
|
||||
|
||||
properties.add(CSVUtils.VALUE_SEPARATOR);
|
||||
properties.add(CSVUtils.QUOTE_CHAR);
|
||||
properties.add(CSVUtils.QUOTE_MODE);
|
||||
properties.add(CSVUtils.COMMENT_MARKER);
|
||||
properties.add(CSVUtils.ESCAPE_CHAR);
|
||||
properties.add(CSVUtils.TRIM_FIELDS);
|
||||
}
|
||||
|
||||
public void onEnabled(final ConfigurationContext context) throws IOException, InitializationException {
|
||||
this.csvFile = context.getProperty(CSV_FILE).evaluateAttributeExpressions().getValue();
|
||||
if( context.getProperty(CSV_FORMAT).getValue().equalsIgnoreCase(RFC4180.getValue()) ) {
|
||||
this.csvFormat = CSVFormat.RFC4180;
|
||||
} else {
|
||||
this.csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());
|
||||
}
|
||||
this.charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
|
||||
this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
|
||||
this.ignoreDuplicates = context.getProperty(IGNORE_DUPLICATES).asBoolean();
|
||||
this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new LastModifiedMonitor(), 30000L);
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,6 @@
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.csv.CSVParser;
|
||||
import org.apache.commons.csv.CSVRecord;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -24,12 +23,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
@ -37,16 +32,12 @@ import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
|
||||
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -55,92 +46,28 @@ import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value", "record"})
|
||||
@CapabilityDescription(
|
||||
"A reloadable CSV file-based lookup service. When the lookup key is found in the CSV file, " +
|
||||
"the columns are returned as a Record. All returned fields will be strings."
|
||||
"the columns are returned as a Record. All returned fields will be strings. The first line of the csv file " +
|
||||
"is considered as header."
|
||||
)
|
||||
public class CSVRecordLookupService extends AbstractControllerService implements RecordLookupService {
|
||||
|
||||
private static final String KEY = "key";
|
||||
public class CSVRecordLookupService extends AbstractCSVLookupService implements RecordLookupService {
|
||||
|
||||
private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
|
||||
|
||||
public static final PropertyDescriptor CSV_FILE =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("csv-file")
|
||||
.displayName("CSV File")
|
||||
.description("A CSV file that will serve as the data source.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AbstractCSVLookupService.CSV_FORMAT)
|
||||
.name("csv-format")
|
||||
.displayName("CSV Format")
|
||||
.description("Specifies which \"format\" the CSV data is in.")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet()))
|
||||
.defaultValue(CSVFormat.Predefined.Default.toString())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CHARSET =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("Character Set")
|
||||
.description("The Character Encoding that is used to decode the CSV file.")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.defaultValue("UTF-8")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("lookup-key-column")
|
||||
.displayName("Lookup Key Column")
|
||||
.description("The field in the CSV file that will serve as the lookup key. " +
|
||||
"This is the field that will be matched against the property specified in the lookup processor.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor IGNORE_DUPLICATES =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("ignore-duplicates")
|
||||
.displayName("Ignore Duplicates")
|
||||
.description("Ignore duplicate keys for records in the CSV file.")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
private volatile ConcurrentMap<String, Record> cache;
|
||||
|
||||
private volatile String csvFile;
|
||||
|
||||
private volatile CSVFormat csvFormat;
|
||||
|
||||
private volatile String charset;
|
||||
|
||||
private volatile String lookupKeyColumn;
|
||||
|
||||
private volatile boolean ignoreDuplicates;
|
||||
|
||||
private volatile SynchronousFileWatcher watcher;
|
||||
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private void loadCache() throws IllegalStateException, IOException {
|
||||
@Override
|
||||
protected void loadCache() throws IllegalStateException, IOException {
|
||||
if (lock.tryLock()) {
|
||||
try {
|
||||
final ComponentLog logger = getLogger();
|
||||
@ -194,30 +121,10 @@ public class CSVRecordLookupService extends AbstractControllerService implements
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(CSV_FILE);
|
||||
properties.add(CSV_FORMAT);
|
||||
properties.add(CHARSET);
|
||||
properties.add(LOOKUP_KEY_COLUMN);
|
||||
properties.add(IGNORE_DUPLICATES);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
@Override
|
||||
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException {
|
||||
this.csvFile = context.getProperty(CSV_FILE).evaluateAttributeExpressions().getValue();
|
||||
this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
|
||||
this.charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
|
||||
this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
|
||||
this.ignoreDuplicates = context.getProperty(IGNORE_DUPLICATES).asBoolean();
|
||||
this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new LastModifiedMonitor(), 30000L);
|
||||
super.onEnabled(context);
|
||||
try {
|
||||
loadCache();
|
||||
} catch (final IllegalStateException e) {
|
||||
|
@ -17,26 +17,19 @@
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.csv.CSVRecord;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
@ -44,63 +37,20 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
|
||||
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
|
||||
|
||||
@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value"})
|
||||
@CapabilityDescription("A reloadable CSV file-based lookup service")
|
||||
public class SimpleCsvFileLookupService extends AbstractControllerService implements StringLookupService {
|
||||
|
||||
private static final String KEY = "key";
|
||||
@CapabilityDescription("A reloadable CSV file-based lookup service. The first line of the csv file is considered as " +
|
||||
"header.")
|
||||
public class SimpleCsvFileLookupService extends AbstractCSVLookupService implements StringLookupService {
|
||||
|
||||
private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
|
||||
|
||||
public static final PropertyDescriptor CSV_FILE =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("csv-file")
|
||||
.displayName("CSV File")
|
||||
.description("A CSV file.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
|
||||
.name("CSV Format")
|
||||
.description("Specifies which \"format\" the CSV data is in, or specifies if custom formatting should be used.")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet()))
|
||||
.defaultValue(CSVFormat.Predefined.Default.toString())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CHARSET =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("Character Set")
|
||||
.description("The Character Encoding that is used to decode the CSV file.")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.defaultValue("UTF-8")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("lookup-key-column")
|
||||
.displayName("Lookup Key Column")
|
||||
.description("Lookup key column.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor LOOKUP_VALUE_COLUMN =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("lookup-value-column")
|
||||
@ -111,38 +61,12 @@ public class SimpleCsvFileLookupService extends AbstractControllerService implem
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor IGNORE_DUPLICATES =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("ignore-duplicates")
|
||||
.displayName("Ignore Duplicates")
|
||||
.description("Ignore duplicate keys for records in the CSV file.")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
private volatile ConcurrentMap<String, String> cache;
|
||||
|
||||
private volatile String csvFile;
|
||||
|
||||
private volatile CSVFormat csvFormat;
|
||||
|
||||
private volatile String charset;
|
||||
|
||||
private volatile String lookupKeyColumn;
|
||||
|
||||
private volatile String lookupValueColumn;
|
||||
|
||||
private volatile boolean ignoreDuplicates;
|
||||
|
||||
private volatile SynchronousFileWatcher watcher;
|
||||
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private void loadCache() throws IllegalStateException, IOException {
|
||||
@Override
|
||||
protected void loadCache() throws IllegalStateException, IOException {
|
||||
if (lock.tryLock()) {
|
||||
try {
|
||||
final ComponentLog logger = getLogger();
|
||||
@ -181,31 +105,15 @@ public class SimpleCsvFileLookupService extends AbstractControllerService implem
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(CSV_FILE);
|
||||
properties.add(CSV_FORMAT);
|
||||
properties.add(CHARSET);
|
||||
properties.add(LOOKUP_KEY_COLUMN);
|
||||
protected void init(final ControllerServiceInitializationContext context) {
|
||||
super.init(context);
|
||||
properties.add(LOOKUP_VALUE_COLUMN);
|
||||
properties.add(IGNORE_DUPLICATES);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, FileNotFoundException {
|
||||
this.csvFile = context.getProperty(CSV_FILE).evaluateAttributeExpressions().getValue();
|
||||
this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
|
||||
this.charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
|
||||
this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
|
||||
public void onEnabled(final ConfigurationContext context) throws IOException, InitializationException {
|
||||
super.onEnabled(context);
|
||||
this.lookupValueColumn = context.getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions().getValue();
|
||||
this.ignoreDuplicates = context.getProperty(IGNORE_DUPLICATES).asBoolean();
|
||||
this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new LastModifiedMonitor(), 30000L);
|
||||
try {
|
||||
loadCache();
|
||||
} catch (final IllegalStateException e) {
|
||||
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.nifi.csv.CSVUtils;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
@ -29,6 +30,7 @@ import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
@ -86,5 +88,28 @@ public class TestCSVRecordLookupService {
|
||||
assertThat(property1.get().getAsString("created_at"), is("2017-04-01"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCsvRecordLookupServiceWithCustomSeparatorQuotedEscaped() throws InitializationException, LookupFailureException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final CSVRecordLookupService service = new CSVRecordLookupService();
|
||||
|
||||
runner.addControllerService("csv-file-lookup-service", service);
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.CSV_FORMAT, "custom");
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.CSV_FILE, "src/test/resources/test_sep_escape_comment.csv");
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_KEY_COLUMN, "key");
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_VALUE_COLUMN, "value");
|
||||
runner.setProperty(service, CSVUtils.VALUE_SEPARATOR, "|");
|
||||
runner.setProperty(service, CSVUtils.QUOTE_CHAR, "\"");
|
||||
runner.setProperty(service, CSVUtils.ESCAPE_CHAR, "%");
|
||||
runner.setProperty(service, CSVUtils.COMMENT_MARKER, "#");
|
||||
runner.setProperty(service, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_ALL);
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
|
||||
final Optional<Record> my_key = service.lookup(Collections.singletonMap("key", "my_key"));
|
||||
assertTrue(my_key.isPresent());
|
||||
assertEquals("my_value with an escaped |.", my_key.get().getAsString("value"));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.nifi.csv.CSVUtils;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
@ -83,4 +84,26 @@ public class TestSimpleCsvFileLookupService {
|
||||
assertThat(property1.isPresent(), is(true));
|
||||
assertThat(property1.get(), is("this is property \uff11"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleCsvFileLookupServiceWithCustomSeparatorQuotedEscaped() throws InitializationException, LookupFailureException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final SimpleCsvFileLookupService service = new SimpleCsvFileLookupService();
|
||||
|
||||
runner.addControllerService("csv-file-lookup-service", service);
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.CSV_FORMAT, "custom");
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.CSV_FILE, "src/test/resources/test_sep_escape_comment.csv");
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_KEY_COLUMN, "key");
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_VALUE_COLUMN, "value");
|
||||
runner.setProperty(service, CSVUtils.VALUE_SEPARATOR, "|");
|
||||
runner.setProperty(service, CSVUtils.QUOTE_CHAR, "\"");
|
||||
runner.setProperty(service, CSVUtils.ESCAPE_CHAR, "%");
|
||||
runner.setProperty(service, CSVUtils.COMMENT_MARKER, "#");
|
||||
runner.setProperty(service, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_ALL);
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
|
||||
final Optional<String> value = service.lookup(Collections.singletonMap("key", "my_key"));
|
||||
assertEquals(Optional.of("my_value with an escaped |."), value);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,3 @@
|
||||
"key"|"value"|"created_at"
|
||||
# This is a remarkably nice comment
|
||||
"my_key"|"my_value with an escaped %|."|"2020-07-30"
|
Can't render this file because it has a wrong number of fields in line 2.
|
Loading…
x
Reference in New Issue
Block a user