diff --git a/docs/reference/ingest/processors/csv.asciidoc b/docs/reference/ingest/processors/csv.asciidoc
index 9efad89fd45..505bd14162a 100644
--- a/docs/reference/ingest/processors/csv.asciidoc
+++ b/docs/reference/ingest/processors/csv.asciidoc
@@ -13,6 +13,8 @@ Extracts fields from CSV line out of a single text field within a document. Any
| `quote` | no | " | Quote used in CSV, has to be single character string
| `ignore_missing` | no | `true` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `trim` | no | `false` | Trim whitespaces in unquoted fields
+| `empty_value` | no | - | Value used to fill empty fields, empty fields will be skipped if this is not provided.
+ Empty field is one with no value (2 consecutive separators) or empty quotes (`""`)
include::common-options.asciidoc[]
|======
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java
index 077d12684e9..0997ecbb2a7 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvParser.java
@@ -36,6 +36,7 @@ final class CsvParser {
private final char separator;
private final boolean trim;
private final String[] headers;
+ private final Object emptyValue;
private final IngestDocument ingestDocument;
private final StringBuilder builder = new StringBuilder();
private State state = State.START;
@@ -45,12 +46,13 @@ final class CsvParser {
private int length;
private int currentIndex;
- CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers) {
+ CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers, Object emptyValue) {
this.ingestDocument = ingestDocument;
this.quote = quote;
this.separator = separator;
this.trim = trim;
this.headers = headers;
+ this.emptyValue = emptyValue;
}
void process(String line) {
@@ -102,7 +104,8 @@ final class CsvParser {
return false;
} else if (c == separator) {
startIndex++;
- if (nextHeader()) {
+ builder.setLength(0);
+ if (setField(startIndex)) {
return true;
}
} else if (isWhitespace(c)) {
@@ -190,16 +193,17 @@ final class CsvParser {
}
private boolean setField(int endIndex) {
+ String value;
if (builder.length() == 0) {
- ingestDocument.setFieldValue(headers[currentHeader], line.substring(startIndex, endIndex));
+ value = line.substring(startIndex, endIndex);
} else {
- builder.append(line, startIndex, endIndex);
- ingestDocument.setFieldValue(headers[currentHeader], builder.toString());
+ value = builder.append(line, startIndex, endIndex).toString();
+ }
+ if (value.length() > 0) {
+ ingestDocument.setFieldValue(headers[currentHeader], value);
+ } else if (emptyValue != null) {
+ ingestDocument.setFieldValue(headers[currentHeader], emptyValue);
}
- return nextHeader();
- }
-
- private boolean nextHeader() {
currentHeader++;
return currentHeader == headers.length;
}
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java
index 746a800b764..949f5fdf7b4 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java
@@ -32,13 +32,14 @@ import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationExcept
* A processor that breaks line from CSV file into separate fields.
* If there's more fields requested than there is in the CSV, extra field will not be present in the document after processing.
* In the same way this processor will skip any field that is empty in CSV.
- *
+ *
* By default it uses rules according to RCF 4180 with one exception: whitespaces are
* allowed before or after quoted field. Processor can be tweaked with following parameters:
- *
+ *
* quote: set custom quote character (defaults to ")
* separator: set custom separator (defaults to ,)
* trim: trim leading and trailing whitespaces in unquoted fields
+ * empty_value: sets custom value to use for empty fields (field is skipped if null)
*/
public final class CsvProcessor extends AbstractProcessor {
@@ -50,8 +51,10 @@ public final class CsvProcessor extends AbstractProcessor {
private final char quote;
private final char separator;
private final boolean ignoreMissing;
+ private final Object emptyValue;
- CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing) {
+ CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing,
+ Object emptyValue) {
super(tag);
this.field = field;
this.headers = headers;
@@ -59,6 +62,7 @@ public final class CsvProcessor extends AbstractProcessor {
this.quote = quote;
this.separator = separator;
this.ignoreMissing = ignoreMissing;
+ this.emptyValue = emptyValue;
}
@Override
@@ -73,7 +77,7 @@ public final class CsvProcessor extends AbstractProcessor {
} else if (line == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
}
- new CsvParser(ingestDocument, quote, separator, trim, headers).process(line);
+ new CsvParser(ingestDocument, quote, separator, trim, headers, emptyValue).process(line);
return ingestDocument;
}
@@ -96,13 +100,17 @@ public final class CsvProcessor extends AbstractProcessor {
throw newConfigurationException(TYPE, processorTag, "separator", "separator has to be single character like , or ;");
}
boolean trim = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trim", false);
+ Object emptyValue = null;
+ if(config.containsKey("emptyValue")){
+ emptyValue = ConfigurationUtils.readObject(TYPE, processorTag, config, "empty_value");
+ }
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
List targetFields = ConfigurationUtils.readList(TYPE, processorTag, config, "target_fields");
if (targetFields.isEmpty()) {
throw newConfigurationException(TYPE, processorTag, "target_fields", "target fields list can't be empty");
}
return new CsvProcessor(processorTag, field, targetFields.toArray(new String[0]), trim, separator.charAt(0), quote.charAt(0),
- ignoreMissing);
+ ignoreMissing, emptyValue);
}
}
}
diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CsvProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CsvProcessorTests.java
index 9e0a2b683e3..bfde04555e6 100644
--- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CsvProcessorTests.java
+++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CsvProcessorTests.java
@@ -53,7 +53,7 @@ public class CsvProcessorTests extends ESTestCase {
separator = randomFrom(SEPARATORS);
}
- public void testExactNumberOfFields() throws Exception {
+ public void testExactNumberOfFields() {
int numItems = randomIntBetween(2, 10);
Map items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
@@ -67,7 +67,67 @@ public class CsvProcessorTests extends ESTestCase {
items.forEach((key, value) -> assertEquals(value, ingestDocument.getFieldValue(key, String.class)));
}
- public void testLessFieldsThanHeaders() throws Exception {
+ public void testEmptyValues() {
+ int numItems = randomIntBetween(5, 10);
+ Map items = new LinkedHashMap<>();
+ for (int i = 0; i < 3; i++) {
+ items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
+ }
+ String emptyKey = randomAlphaOfLengthBetween(5, 10);
+ items.put(emptyKey, "");
+ for (int i = 0; i < numItems - 4; i++) {
+ items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
+ }
+ String[] headers = items.keySet().toArray(new String[numItems]);
+ String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
+
+ IngestDocument ingestDocument = processDocument(headers, csv);
+
+ items.forEach((key, value) -> {
+ if (emptyKey.equals(key)) {
+ assertFalse(ingestDocument.hasField(key));
+ } else {
+ assertEquals(value, ingestDocument.getFieldValue(key, String.class));
+ }
+ });
+ }
+
+ public void testEmptyValuesReplace() {
+ int numItems = randomIntBetween(5, 10);
+ Map items = new LinkedHashMap<>();
+ for (int i = 0; i < 3; i++) {
+ items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
+ }
+ String emptyKey = randomAlphaOfLengthBetween(5, 10);
+ items.put(emptyKey, "");
+ for (int i = 0; i < numItems - 4; i++) {
+ items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
+ }
+ String[] headers = items.keySet().toArray(new String[numItems]);
+ String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
+
+ IngestDocument ingestDocument = processDocument(headers, csv, true, "");
+
+ items.forEach((key, value) -> {
+ if (emptyKey.equals(key)) {
+ assertEquals("", ingestDocument.getFieldValue(key, String.class));
+ } else {
+ assertEquals(value, ingestDocument.getFieldValue(key, String.class));
+ }
+ });
+
+ IngestDocument ingestDocument2 = processDocument(headers, csv, true, 0);
+
+ items.forEach((key, value) -> {
+ if (emptyKey.equals(key)) {
+ assertEquals(0, (int) ingestDocument2.getFieldValue(key, Integer.class));
+ } else {
+ assertEquals(value, ingestDocument2.getFieldValue(key, String.class));
+ }
+ });
+ }
+
+ public void testLessFieldsThanHeaders() {
int numItems = randomIntBetween(4, 10);
Map items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
@@ -82,7 +142,7 @@ public class CsvProcessorTests extends ESTestCase {
items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class)));
}
- public void testLessHeadersThanFields() throws Exception {
+ public void testLessHeadersThanFields() {
int numItems = randomIntBetween(5, 10);
Map items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
@@ -96,7 +156,7 @@ public class CsvProcessorTests extends ESTestCase {
items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class)));
}
- public void testSingleField() throws Exception {
+ public void testSingleField() {
String[] headers = new String[]{randomAlphaOfLengthBetween(5, 10)};
String value = randomAlphaOfLengthBetween(5, 10);
String csv = quote + value + quote;
@@ -106,7 +166,7 @@ public class CsvProcessorTests extends ESTestCase {
assertEquals(value, ingestDocument.getFieldValue(headers[0], String.class));
}
- public void testEscapedQuote() throws Exception {
+ public void testEscapedQuote() {
int numItems = randomIntBetween(2, 10);
Map items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
@@ -121,7 +181,7 @@ public class CsvProcessorTests extends ESTestCase {
items.forEach((key, value) -> assertEquals(value.replace(quote + quote, quote), ingestDocument.getFieldValue(key, String.class)));
}
- public void testQuotedStrings() throws Exception {
+ public void testQuotedStrings() {
assumeFalse("quote needed", quote.isEmpty());
int numItems = randomIntBetween(2, 10);
Map items = new LinkedHashMap<>();
@@ -138,7 +198,7 @@ public class CsvProcessorTests extends ESTestCase {
String.class)));
}
- public void testEmptyFields() throws Exception {
+ public void testEmptyFields() {
int numItems = randomIntBetween(5, 10);
Map items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
@@ -167,7 +227,7 @@ public class CsvProcessorTests extends ESTestCase {
expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\rabc"));
}
- public void testQuotedWhitespaces() throws Exception {
+ public void testQuotedWhitespaces() {
assumeFalse("quote needed", quote.isEmpty());
IngestDocument document = processDocument(new String[]{"a", "b", "c", "d"},
" abc " + separator + " def" + separator + "ghi " + separator + " " + quote + " ooo " + quote);
@@ -177,7 +237,7 @@ public class CsvProcessorTests extends ESTestCase {
assertEquals(" ooo ", document.getFieldValue("d", String.class));
}
- public void testUntrimmed() throws Exception {
+ public void testUntrimmed() {
assumeFalse("quote needed", quote.isEmpty());
IngestDocument document = processDocument(new String[]{"a", "b", "c", "d", "e", "f"},
" abc " + separator + " def" + separator + "ghi " + separator + " "
@@ -197,9 +257,9 @@ public class CsvProcessorTests extends ESTestCase {
if (ingestDocument.hasField(fieldName)) {
ingestDocument.removeField(fieldName);
}
- CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', true);
+ CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', true, null);
processor.execute(ingestDocument);
- CsvProcessor processor2 = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', false);
+ CsvProcessor processor2 = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[]{"a"}, false, ',', '"', false, null);
expectThrows(IllegalArgumentException.class, () -> processor2.execute(ingestDocument));
}
@@ -209,24 +269,29 @@ public class CsvProcessorTests extends ESTestCase {
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "abc,abc");
HashMap metadata = new HashMap<>(ingestDocument.getSourceAndMetadata());
- CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[0], false, ',', '"', false);
+ CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[0], false, ',', '"', false, null);
processor.execute(ingestDocument);
assertEquals(metadata, ingestDocument.getSourceAndMetadata());
}
- private IngestDocument processDocument(String[] headers, String csv) throws Exception {
+ private IngestDocument processDocument(String[] headers, String csv) {
return processDocument(headers, csv, true);
}
- private IngestDocument processDocument(String[] headers, String csv, boolean trim) throws Exception {
+ private IngestDocument processDocument(String[] headers, String csv, boolean trim) {
+ return processDocument(headers, csv, trim, null);
+ }
+
+ private IngestDocument processDocument(String[] headers, String csv, boolean trim, Object emptyValue) {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Arrays.stream(headers).filter(ingestDocument::hasField).forEach(ingestDocument::removeField);
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, csv);
char quoteChar = quote.isEmpty() ? '"' : quote.charAt(0);
- CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, headers, trim, separator, quoteChar, false);
+ CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, headers, trim, separator, quoteChar, false,
+ emptyValue);
processor.execute(ingestDocument);