Add empty_value parameter to CSV processor (#51567) (#51966)

* Add empty_value parameter to CSV processor

This change adds `empty_value` parameter to the CSV processor.
This value is used to fill empty fields. Fields will be skipped
if this parameter is ommited. This behavior is the same for both
quoted and unquoted fields.

* docs updated

* Fix compilation problem

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Przemko Robakowski 2020-02-05 23:35:52 +01:00 committed by GitHub
parent 69c1ecbf2b
commit 6332de40b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 29 deletions

View File

@ -13,6 +13,8 @@ Extracts fields from CSV line out of a single text field within a document. Any
| `quote` | no | " | Quote used in CSV, has to be single character string | `quote` | no | " | Quote used in CSV, has to be single character string
| `ignore_missing` | no | `true` | If `true` and `field` does not exist, the processor quietly exits without modifying the document | `ignore_missing` | no | `true` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `trim` | no | `false` | Trim whitespaces in unquoted fields | `trim` | no | `false` | Trim whitespaces in unquoted fields
| `empty_value` | no | - | Value used to fill empty fields, empty fields will be skipped if this is not provided.
Empty field is one with no value (2 consecutive separators) or empty quotes (`""`)
include::common-options.asciidoc[] include::common-options.asciidoc[]
|====== |======

View File

@ -36,6 +36,7 @@ final class CsvParser {
private final char separator; private final char separator;
private final boolean trim; private final boolean trim;
private final String[] headers; private final String[] headers;
private final Object emptyValue;
private final IngestDocument ingestDocument; private final IngestDocument ingestDocument;
private final StringBuilder builder = new StringBuilder(); private final StringBuilder builder = new StringBuilder();
private State state = State.START; private State state = State.START;
@ -45,12 +46,13 @@ final class CsvParser {
private int length; private int length;
private int currentIndex; private int currentIndex;
CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers) { CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers, Object emptyValue) {
this.ingestDocument = ingestDocument; this.ingestDocument = ingestDocument;
this.quote = quote; this.quote = quote;
this.separator = separator; this.separator = separator;
this.trim = trim; this.trim = trim;
this.headers = headers; this.headers = headers;
this.emptyValue = emptyValue;
} }
void process(String line) { void process(String line) {
@ -102,7 +104,8 @@ final class CsvParser {
return false; return false;
} else if (c == separator) { } else if (c == separator) {
startIndex++; startIndex++;
if (nextHeader()) { builder.setLength(0);
if (setField(startIndex)) {
return true; return true;
} }
} else if (isWhitespace(c)) { } else if (isWhitespace(c)) {
@ -190,16 +193,17 @@ final class CsvParser {
} }
private boolean setField(int endIndex) { private boolean setField(int endIndex) {
String value;
if (builder.length() == 0) { if (builder.length() == 0) {
ingestDocument.setFieldValue(headers[currentHeader], line.substring(startIndex, endIndex)); value = line.substring(startIndex, endIndex);
} else { } else {
builder.append(line, startIndex, endIndex); value = builder.append(line, startIndex, endIndex).toString();
ingestDocument.setFieldValue(headers[currentHeader], builder.toString());
} }
return nextHeader(); if (value.length() > 0) {
ingestDocument.setFieldValue(headers[currentHeader], value);
} else if (emptyValue != null) {
ingestDocument.setFieldValue(headers[currentHeader], emptyValue);
} }
private boolean nextHeader() {
currentHeader++; currentHeader++;
return currentHeader == headers.length; return currentHeader == headers.length;
} }

View File

@ -32,13 +32,14 @@ import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationExcept
* A processor that breaks line from CSV file into separate fields. * A processor that breaks line from CSV file into separate fields.
* If there's more fields requested than there is in the CSV, extra field will not be present in the document after processing. * If there's more fields requested than there is in the CSV, extra field will not be present in the document after processing.
* In the same way this processor will skip any field that is empty in CSV. * In the same way this processor will skip any field that is empty in CSV.
* * <p>
* By default it uses rules according to <a href="https://tools.ietf.org/html/rfc4180">RCF 4180</a> with one exception: whitespaces are * By default it uses rules according to <a href="https://tools.ietf.org/html/rfc4180">RCF 4180</a> with one exception: whitespaces are
* allowed before or after quoted field. Processor can be tweaked with following parameters: * allowed before or after quoted field. Processor can be tweaked with following parameters:
* * <p>
* quote: set custom quote character (defaults to ") * quote: set custom quote character (defaults to ")
* separator: set custom separator (defaults to ,) * separator: set custom separator (defaults to ,)
* trim: trim leading and trailing whitespaces in unquoted fields * trim: trim leading and trailing whitespaces in unquoted fields
* empty_value: sets custom value to use for empty fields (field is skipped if null)
*/ */
public final class CsvProcessor extends AbstractProcessor { public final class CsvProcessor extends AbstractProcessor {
@ -50,8 +51,10 @@ public final class CsvProcessor extends AbstractProcessor {
private final char quote; private final char quote;
private final char separator; private final char separator;
private final boolean ignoreMissing; private final boolean ignoreMissing;
private final Object emptyValue;
CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing) { CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing,
Object emptyValue) {
super(tag); super(tag);
this.field = field; this.field = field;
this.headers = headers; this.headers = headers;
@ -59,6 +62,7 @@ public final class CsvProcessor extends AbstractProcessor {
this.quote = quote; this.quote = quote;
this.separator = separator; this.separator = separator;
this.ignoreMissing = ignoreMissing; this.ignoreMissing = ignoreMissing;
this.emptyValue = emptyValue;
} }
@Override @Override
@ -73,7 +77,7 @@ public final class CsvProcessor extends AbstractProcessor {
} else if (line == null) { } else if (line == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it."); throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
} }
new CsvParser(ingestDocument, quote, separator, trim, headers).process(line); new CsvParser(ingestDocument, quote, separator, trim, headers, emptyValue).process(line);
return ingestDocument; return ingestDocument;
} }
@ -96,13 +100,17 @@ public final class CsvProcessor extends AbstractProcessor {
throw newConfigurationException(TYPE, processorTag, "separator", "separator has to be single character like , or ;"); throw newConfigurationException(TYPE, processorTag, "separator", "separator has to be single character like , or ;");
} }
boolean trim = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trim", false); boolean trim = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trim", false);
Object emptyValue = null;
if(config.containsKey("emptyValue")){
emptyValue = ConfigurationUtils.readObject(TYPE, processorTag, config, "empty_value");
}
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
List<String> targetFields = ConfigurationUtils.readList(TYPE, processorTag, config, "target_fields"); List<String> targetFields = ConfigurationUtils.readList(TYPE, processorTag, config, "target_fields");
if (targetFields.isEmpty()) { if (targetFields.isEmpty()) {
throw newConfigurationException(TYPE, processorTag, "target_fields", "target fields list can't be empty"); throw newConfigurationException(TYPE, processorTag, "target_fields", "target fields list can't be empty");
} }
return new CsvProcessor(processorTag, field, targetFields.toArray(new String[0]), trim, separator.charAt(0), quote.charAt(0), return new CsvProcessor(processorTag, field, targetFields.toArray(new String[0]), trim, separator.charAt(0), quote.charAt(0),
ignoreMissing); ignoreMissing, emptyValue);
} }
} }
} }

View File

@ -53,7 +53,7 @@ public class CsvProcessorTests extends ESTestCase {
separator = randomFrom(SEPARATORS); separator = randomFrom(SEPARATORS);
} }
public void testExactNumberOfFields() throws Exception { public void testExactNumberOfFields() {
int numItems = randomIntBetween(2, 10); int numItems = randomIntBetween(2, 10);
Map<String, String> items = new LinkedHashMap<>(); Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) { for (int i = 0; i < numItems; i++) {
@ -67,7 +67,67 @@ public class CsvProcessorTests extends ESTestCase {
items.forEach((key, value) -> assertEquals(value, ingestDocument.getFieldValue(key, String.class))); items.forEach((key, value) -> assertEquals(value, ingestDocument.getFieldValue(key, String.class)));
} }
public void testLessFieldsThanHeaders() throws Exception { public void testEmptyValues() {
int numItems = randomIntBetween(5, 10);
Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < 3; i++) {
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
String emptyKey = randomAlphaOfLengthBetween(5, 10);
items.put(emptyKey, "");
for (int i = 0; i < numItems - 4; i++) {
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
String[] headers = items.keySet().toArray(new String[numItems]);
String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
IngestDocument ingestDocument = processDocument(headers, csv);
items.forEach((key, value) -> {
if (emptyKey.equals(key)) {
assertFalse(ingestDocument.hasField(key));
} else {
assertEquals(value, ingestDocument.getFieldValue(key, String.class));
}
});
}
public void testEmptyValuesReplace() {
int numItems = randomIntBetween(5, 10);
Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < 3; i++) {
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
String emptyKey = randomAlphaOfLengthBetween(5, 10);
items.put(emptyKey, "");
for (int i = 0; i < numItems - 4; i++) {
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
String[] headers = items.keySet().toArray(new String[numItems]);
String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
IngestDocument ingestDocument = processDocument(headers, csv, true, "");
items.forEach((key, value) -> {
if (emptyKey.equals(key)) {
assertEquals("", ingestDocument.getFieldValue(key, String.class));
} else {
assertEquals(value, ingestDocument.getFieldValue(key, String.class));
}
});
IngestDocument ingestDocument2 = processDocument(headers, csv, true, 0);
items.forEach((key, value) -> {
if (emptyKey.equals(key)) {
assertEquals(0, (int) ingestDocument2.getFieldValue(key, Integer.class));
} else {
assertEquals(value, ingestDocument2.getFieldValue(key, String.class));
}
});
}
public void testLessFieldsThanHeaders() {
int numItems = randomIntBetween(4, 10); int numItems = randomIntBetween(4, 10);
Map<String, String> items = new LinkedHashMap<>(); Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) { for (int i = 0; i < numItems; i++) {
@ -82,7 +142,7 @@ public class CsvProcessorTests extends ESTestCase {
items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class))); items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class)));
} }
public void testLessHeadersThanFields() throws Exception { public void testLessHeadersThanFields() {
int numItems = randomIntBetween(5, 10); int numItems = randomIntBetween(5, 10);
Map<String, String> items = new LinkedHashMap<>(); Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) { for (int i = 0; i < numItems; i++) {
@ -96,7 +156,7 @@ public class CsvProcessorTests extends ESTestCase {
items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class))); items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class)));
} }
public void testSingleField() throws Exception { public void testSingleField() {
String[] headers = new String[]{randomAlphaOfLengthBetween(5, 10)}; String[] headers = new String[]{randomAlphaOfLengthBetween(5, 10)};
String value = randomAlphaOfLengthBetween(5, 10); String value = randomAlphaOfLengthBetween(5, 10);
String csv = quote + value + quote; String csv = quote + value + quote;
@ -106,7 +166,7 @@ public class CsvProcessorTests extends ESTestCase {
assertEquals(value, ingestDocument.getFieldValue(headers[0], String.class)); assertEquals(value, ingestDocument.getFieldValue(headers[0], String.class));
} }
public void testEscapedQuote() throws Exception { public void testEscapedQuote() {
int numItems = randomIntBetween(2, 10); int numItems = randomIntBetween(2, 10);
Map<String, String> items = new LinkedHashMap<>(); Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) { for (int i = 0; i < numItems; i++) {
@ -121,7 +181,7 @@ public class CsvProcessorTests extends ESTestCase {
items.forEach((key, value) -> assertEquals(value.replace(quote + quote, quote), ingestDocument.getFieldValue(key, String.class))); items.forEach((key, value) -> assertEquals(value.replace(quote + quote, quote), ingestDocument.getFieldValue(key, String.class)));
} }
public void testQuotedStrings() throws Exception { public void testQuotedStrings() {
assumeFalse("quote needed", quote.isEmpty()); assumeFalse("quote needed", quote.isEmpty());
int numItems = randomIntBetween(2, 10); int numItems = randomIntBetween(2, 10);
Map<String, String> items = new LinkedHashMap<>(); Map<String, String> items = new LinkedHashMap<>();
@ -138,7 +198,7 @@ public class CsvProcessorTests extends ESTestCase {
String.class))); String.class)));
} }
public void testEmptyFields() throws Exception { public void testEmptyFields() {
int numItems = randomIntBetween(5, 10); int numItems = randomIntBetween(5, 10);
Map<String, String> items = new LinkedHashMap<>(); Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) { for (int i = 0; i < numItems; i++) {
@ -167,7 +227,7 @@ public class CsvProcessorTests extends ESTestCase {
expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\rabc")); expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\rabc"));
} }
public void testQuotedWhitespaces() throws Exception { public void testQuotedWhitespaces() {
assumeFalse("quote needed", quote.isEmpty()); assumeFalse("quote needed", quote.isEmpty());
IngestDocument document = processDocument(new String[]{"a", "b", "c", "d"}, IngestDocument document = processDocument(new String[]{"a", "b", "c", "d"},
" abc " + separator + " def" + separator + "ghi " + separator + " " + quote + " ooo " + quote); " abc " + separator + " def" + separator + "ghi " + separator + " " + quote + " ooo " + quote);
@ -177,7 +237,7 @@ public class CsvProcessorTests extends ESTestCase {
assertEquals(" ooo ", document.getFieldValue("d", String.class)); assertEquals(" ooo ", document.getFieldValue("d", String.class));
} }
public void testUntrimmed() throws Exception { public void testUntrimmed() {
assumeFalse("quote needed", quote.isEmpty()); assumeFalse("quote needed", quote.isEmpty());
IngestDocument document = processDocument(new String[]{"a", "b", "c", "d", "e", "f"}, IngestDocument document = processDocument(new String[]{"a", "b", "c", "d", "e", "f"},
" abc " + separator + " def" + separator + "ghi " + separator + " " " abc " + separator + " def" + separator + "ghi " + separator + " "
@ -197,9 +257,9 @@ public class CsvProcessorTests extends ESTestCase {
if (ingestDocument.hasField(fieldName)) { if (ingestDocument.hasField(fieldName)) {
ingestDocument.removeField(fieldName); ingestDocument.removeField(fieldName);
} }
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', true); CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', true, null);
processor.execute(ingestDocument); processor.execute(ingestDocument);
CsvProcessor processor2 = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', false); CsvProcessor processor2 = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', false, null);
expectThrows(IllegalArgumentException.class, () -> processor2.execute(ingestDocument)); expectThrows(IllegalArgumentException.class, () -> processor2.execute(ingestDocument));
} }
@ -209,24 +269,29 @@ public class CsvProcessorTests extends ESTestCase {
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "abc,abc"); String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "abc,abc");
HashMap<String, Object> metadata = new HashMap<>(ingestDocument.getSourceAndMetadata()); HashMap<String, Object> metadata = new HashMap<>(ingestDocument.getSourceAndMetadata());
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[0], false, ',', '"', false); CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[0], false, ',', '"', false, null);
processor.execute(ingestDocument); processor.execute(ingestDocument);
assertEquals(metadata, ingestDocument.getSourceAndMetadata()); assertEquals(metadata, ingestDocument.getSourceAndMetadata());
} }
private IngestDocument processDocument(String[] headers, String csv) throws Exception { private IngestDocument processDocument(String[] headers, String csv) {
return processDocument(headers, csv, true); return processDocument(headers, csv, true);
} }
private IngestDocument processDocument(String[] headers, String csv, boolean trim) throws Exception { private IngestDocument processDocument(String[] headers, String csv, boolean trim) {
return processDocument(headers, csv, trim, null);
}
private IngestDocument processDocument(String[] headers, String csv, boolean trim, Object emptyValue) {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Arrays.stream(headers).filter(ingestDocument::hasField).forEach(ingestDocument::removeField); Arrays.stream(headers).filter(ingestDocument::hasField).forEach(ingestDocument::removeField);
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, csv); String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, csv);
char quoteChar = quote.isEmpty() ? '"' : quote.charAt(0); char quoteChar = quote.isEmpty() ? '"' : quote.charAt(0);
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, headers, trim, separator, quoteChar, false); CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, headers, trim, separator, quoteChar, false,
emptyValue);
processor.execute(ingestDocument); processor.execute(ingestDocument);