[7.x] CSV ingest processor (#49509) (#50083)

* CSV ingest processor (#49509)

This change adds new ingest processor that breaks line from CSV file into separate fields.
By default it conforms to RFC 4180 but can be tweaked.

Closes #49113
This commit is contained in:
Przemko Robakowski 2019-12-11 23:06:05 +01:00 committed by GitHub
parent 23ab9e0204
commit 4619834b97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 734 additions and 0 deletions

View File

@ -829,6 +829,7 @@ include::processors/append.asciidoc[]
include::processors/bytes.asciidoc[]
include::processors/circle.asciidoc[]
include::processors/convert.asciidoc[]
include::processors/csv.asciidoc[]
include::processors/date.asciidoc[]
include::processors/date-index-name.asciidoc[]
include::processors/dissect.asciidoc[]

View File

@ -0,0 +1,33 @@
[[csv-processor]]
=== CSV Processor
Extracts fields from CSV line out of a single text field within a document. Any empty field in CSV will be skipped.
[[csv-options]]
.CSV Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to extract data from
| `target_fields` | yes | - | The array of fields to assign extracted values to
| `separator` | no | , | Separator used in CSV, has to be single character string
| `quote` | no | " | Quote used in CSV, has to be single character string
| `ignore_missing` | no | `true` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `trim` | no | `false` | Trim whitespaces in unquoted fields
include::common-options.asciidoc[]
|======
[source,js]
--------------------------------------------------
{
"csv": {
"field": "my_field",
"target_fields": ["field1, field2"],
}
}
--------------------------------------------------
// NOTCONSOLE
If the `trim` option is enabled then any whitespace in the beginning and in the end of each unquoted field will be trimmed.
For example with configuration above, a value of `A, B` will result in field `field2`
having value `{nbsp}B` (with space at the beginning). If `trim` is enabled `A, B` will result in field `field2`
having value `B` (no whitespace). Quoted fields will be left untouched.

View File

@ -0,0 +1,206 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.IngestDocument;
final class CsvParser {
private static final char LF = '\n';
private static final char CR = '\r';
private static final char SPACE = ' ';
private static final char TAB = '\t';
private enum State {
START, UNQUOTED, QUOTED, QUOTED_END
}
private final char quote;
private final char separator;
private final boolean trim;
private final String[] headers;
private final IngestDocument ingestDocument;
private final StringBuilder builder = new StringBuilder();
private State state = State.START;
private String line;
private int currentHeader = 0;
private int startIndex = 0;
private int length;
private int currentIndex;
CsvParser(IngestDocument ingestDocument, char quote, char separator, boolean trim, String[] headers) {
this.ingestDocument = ingestDocument;
this.quote = quote;
this.separator = separator;
this.trim = trim;
this.headers = headers;
}
void process(String line) {
this.line = line;
length = line.length();
for (currentIndex = 0; currentIndex < length; currentIndex++) {
switch (state) {
case START:
if (processStart()) {
return;
}
break;
case UNQUOTED:
if (processUnquoted()) {
return;
}
break;
case QUOTED:
processQuoted();
break;
case QUOTED_END:
if (processQuotedEnd()) {
return;
}
break;
}
}
//we've reached end of string, we need to handle last field
switch (state) {
case UNQUOTED:
setField(length);
break;
case QUOTED_END:
setField(length - 1);
break;
case QUOTED:
throw new IllegalArgumentException("Unmatched quote");
}
}
private boolean processStart() {
for (; currentIndex < length; currentIndex++) {
char c = currentChar();
if (c == quote) {
state = State.QUOTED;
builder.setLength(0);
startIndex = currentIndex + 1;
return false;
} else if (c == separator) {
startIndex++;
if (nextHeader()) {
return true;
}
} else if (isWhitespace(c)) {
if (trim) {
startIndex++;
}
} else {
state = State.UNQUOTED;
builder.setLength(0);
return false;
}
}
return true;
}
private boolean processUnquoted() {
int spaceCount = 0;
for (; currentIndex < length; currentIndex++) {
char c = currentChar();
if (c == LF || c == CR || c == quote) {
throw new IllegalArgumentException("Illegal character inside unquoted field at " + currentIndex);
} else if (trim && isWhitespace(c)) {
spaceCount++;
} else if (c == separator) {
state = State.START;
if (setField(currentIndex - spaceCount)) {
return true;
}
startIndex = currentIndex + 1;
return false;
} else {
spaceCount = 0;
}
}
return false;
}
private void processQuoted() {
for (; currentIndex < length; currentIndex++) {
if (currentChar() == quote) {
state = State.QUOTED_END;
break;
}
}
}
private boolean processQuotedEnd() {
char c = currentChar();
if (c == quote) {
builder.append(line, startIndex, currentIndex - 1).append(quote);
startIndex = currentIndex + 1;
state = State.QUOTED;
return false;
}
boolean shouldSetField = true;
for (; currentIndex < length; currentIndex++) {
c = currentChar();
if (isWhitespace(c)) {
if (shouldSetField) {
if (setField(currentIndex - 1)) {
return true;
}
shouldSetField = false;
}
} else if (c == separator) {
if (shouldSetField && setField(currentIndex - 1)) {
return true;
}
startIndex = currentIndex + 1;
state = State.START;
return false;
} else {
throw new IllegalArgumentException("character '" + c + "' after quoted field at " + currentIndex);
}
}
return true;
}
private char currentChar() {
return line.charAt(currentIndex);
}
private boolean isWhitespace(char c) {
return c == SPACE || c == TAB;
}
private boolean setField(int endIndex) {
if (builder.length() == 0) {
ingestDocument.setFieldValue(headers[currentHeader], line.substring(startIndex, endIndex));
} else {
builder.append(line, startIndex, endIndex);
ingestDocument.setFieldValue(headers[currentHeader], builder.toString());
}
return nextHeader();
}
private boolean nextHeader() {
currentHeader++;
return currentHeader == headers.length;
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
/**
* A processor that breaks line from CSV file into separate fields.
* If there's more fields requested than there is in the CSV, extra field will not be present in the document after processing.
* In the same way this processor will skip any field that is empty in CSV.
*
* By default it uses rules according to <a href="https://tools.ietf.org/html/rfc4180">RCF 4180</a> with one exception: whitespaces are
* allowed before or after quoted field. Processor can be tweaked with following parameters:
*
* quote: set custom quote character (defaults to ")
* separator: set custom separator (defaults to ,)
* trim: trim leading and trailing whitespaces in unquoted fields
*/
public final class CsvProcessor extends AbstractProcessor {
public static final String TYPE = "csv";
private final String field;
private final String[] headers;
private final boolean trim;
private final char quote;
private final char separator;
private final boolean ignoreMissing;
CsvProcessor(String tag, String field, String[] headers, boolean trim, char separator, char quote, boolean ignoreMissing) {
super(tag);
this.field = field;
this.headers = headers;
this.trim = trim;
this.quote = quote;
this.separator = separator;
this.ignoreMissing = ignoreMissing;
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) {
if (headers.length == 0) {
return ingestDocument;
}
String line = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (line == null && ignoreMissing == false) {
return ingestDocument;
} else if (line == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
}
new CsvParser(ingestDocument, quote, separator, trim, headers).process(line);
return ingestDocument;
}
@Override
public String getType() {
return TYPE;
}
public static final class Factory implements org.elasticsearch.ingest.Processor.Factory {
@Override
public CsvProcessor create(Map<String, org.elasticsearch.ingest.Processor.Factory> registry, String processorTag,
Map<String, Object> config) {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String quote = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "quote", "\"");
if (quote.length() != 1) {
throw newConfigurationException(TYPE, processorTag, "quote", "quote has to be single character like \" or '");
}
String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator", ",");
if (separator.length() != 1) {
throw newConfigurationException(TYPE, processorTag, "separator", "separator has to be single character like , or ;");
}
boolean trim = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trim", false);
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
List<String> targetFields = ConfigurationUtils.readList(TYPE, processorTag, config, "target_fields");
if (targetFields.isEmpty()) {
throw newConfigurationException(TYPE, processorTag, "target_fields", "target fields list can't be empty");
}
return new CsvProcessor(processorTag, field, targetFields.toArray(new String[0]), trim, separator.charAt(0), quote.charAt(0),
ignoreMissing);
}
}
}

View File

@ -88,6 +88,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory());
processors.put(DropProcessor.TYPE, new DropProcessor.Factory());
processors.put(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory());
processors.put(CsvProcessor.TYPE, new CsvProcessor.Factory());
return Collections.unmodifiableMap(processors);
}

View File

@ -0,0 +1,221 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.common;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
public class CsvProcessorTests extends ESTestCase {
private static final Character[] SEPARATORS = new Character[]{',', ';', '|', '.'};
private final String quote;
private char separator;
public CsvProcessorTests(@Name("quote") String quote) {
this.quote = quote;
}
@ParametersFactory
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[]{"'"}, new Object[]{"\""}, new Object[]{""});
}
@Before
public void setup() {
separator = randomFrom(SEPARATORS);
}
public void testExactNumberOfFields() throws Exception {
int numItems = randomIntBetween(2, 10);
Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
String[] headers = items.keySet().toArray(new String[numItems]);
String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
IngestDocument ingestDocument = processDocument(headers, csv);
items.forEach((key, value) -> assertEquals(value, ingestDocument.getFieldValue(key, String.class)));
}
public void testLessFieldsThanHeaders() throws Exception {
int numItems = randomIntBetween(4, 10);
Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
String[] headers = items.keySet().toArray(new String[numItems]);
String csv = items.values().stream().map(v -> quote + v + quote).limit(3).collect(Collectors.joining(separator + ""));
IngestDocument ingestDocument = processDocument(headers, csv);
items.keySet().stream().skip(3).forEach(key -> assertFalse(ingestDocument.hasField(key)));
items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class)));
}
public void testLessHeadersThanFields() throws Exception {
int numItems = randomIntBetween(5, 10);
Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
String[] headers = items.keySet().stream().limit(3).toArray(String[]::new);
String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
IngestDocument ingestDocument = processDocument(headers, csv);
items.entrySet().stream().limit(3).forEach(e -> assertEquals(e.getValue(), ingestDocument.getFieldValue(e.getKey(), String.class)));
}
public void testSingleField() throws Exception {
String[] headers = new String[]{randomAlphaOfLengthBetween(5, 10)};
String value = randomAlphaOfLengthBetween(5, 10);
String csv = quote + value + quote;
IngestDocument ingestDocument = processDocument(headers, csv);
assertEquals(value, ingestDocument.getFieldValue(headers[0], String.class));
}
public void testEscapedQuote() throws Exception {
int numItems = randomIntBetween(2, 10);
Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10) + quote + quote + randomAlphaOfLengthBetween(5
, 10) + quote + quote);
}
String[] headers = items.keySet().toArray(new String[numItems]);
String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
IngestDocument ingestDocument = processDocument(headers, csv);
items.forEach((key, value) -> assertEquals(value.replace(quote + quote, quote), ingestDocument.getFieldValue(key, String.class)));
}
public void testQuotedStrings() throws Exception {
assumeFalse("quote needed", quote.isEmpty());
int numItems = randomIntBetween(2, 10);
Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
items.put(randomAlphaOfLengthBetween(5, 10),
separator + randomAlphaOfLengthBetween(5, 10) + separator + "\n\r" + randomAlphaOfLengthBetween(5, 10));
}
String[] headers = items.keySet().toArray(new String[numItems]);
String csv = items.values().stream().map(v -> quote + v + quote).collect(Collectors.joining(separator + ""));
IngestDocument ingestDocument = processDocument(headers, csv);
items.forEach((key, value) -> assertEquals(value.replace(quote + quote, quote), ingestDocument.getFieldValue(key,
String.class)));
}
public void testEmptyFields() throws Exception {
int numItems = randomIntBetween(5, 10);
Map<String, String> items = new LinkedHashMap<>();
for (int i = 0; i < numItems; i++) {
items.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
String[] headers = items.keySet().toArray(new String[numItems]);
String csv =
items.values().stream().map(v -> quote + v + quote).limit(numItems - 1).skip(3).collect(Collectors.joining(separator + ""));
IngestDocument ingestDocument = processDocument(headers,
"" + separator + "" + separator + "" + separator + csv + separator + separator +
"abc");
items.keySet().stream().limit(3).forEach(key -> assertFalse(ingestDocument.hasField(key)));
items.entrySet().stream().limit(numItems - 1).skip(3).forEach(e -> assertEquals(e.getValue(),
ingestDocument.getFieldValue(e.getKey(), String.class)));
items.keySet().stream().skip(numItems - 1).forEach(key -> assertFalse(ingestDocument.hasField(key)));
}
public void testWrongStings() throws Exception {
assumeTrue("single run only", quote.isEmpty());
expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\"abc"));
expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "\"abc\"asd"));
expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "\"abcasd"));
expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\nabc"));
expectThrows(IllegalArgumentException.class, () -> processDocument(new String[]{"a"}, "abc\rabc"));
}
public void testQuotedWhitespaces() throws Exception {
assumeFalse("quote needed", quote.isEmpty());
IngestDocument document = processDocument(new String[]{"a", "b", "c", "d"},
" abc " + separator + " def" + separator + "ghi " + separator + " " + quote + " ooo " + quote);
assertEquals("abc", document.getFieldValue("a", String.class));
assertEquals("def", document.getFieldValue("b", String.class));
assertEquals("ghi", document.getFieldValue("c", String.class));
assertEquals(" ooo ", document.getFieldValue("d", String.class));
}
public void testUntrimmed() throws Exception {
assumeFalse("quote needed", quote.isEmpty());
IngestDocument document = processDocument(new String[]{"a", "b", "c", "d", "e", "f"},
" abc " + separator + " def" + separator + "ghi " + separator + " "
+ quote + "ooo" + quote + " " + separator + " " + quote + "jjj" + quote + " ", false);
assertEquals(" abc ", document.getFieldValue("a", String.class));
assertEquals(" def", document.getFieldValue("b", String.class));
assertEquals("ghi ", document.getFieldValue("c", String.class));
assertEquals("ooo", document.getFieldValue("d", String.class));
assertEquals("jjj", document.getFieldValue("e", String.class));
assertFalse(document.hasField("f"));
}
public void testEmptyHeaders() throws Exception {
assumeTrue("single run only", quote.isEmpty());
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "abc,abc");
HashMap<String, Object> metadata = new HashMap<>(ingestDocument.getSourceAndMetadata());
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, new String[0], false, ',', '"', false);
processor.execute(ingestDocument);
assertEquals(metadata, ingestDocument.getSourceAndMetadata());
}
private IngestDocument processDocument(String[] headers, String csv) throws Exception {
return processDocument(headers, csv, true);
}
private IngestDocument processDocument(String[] headers, String csv, boolean trim) throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, csv);
char quoteChar = quote.isEmpty() ? '"' : quote.charAt(0);
CsvProcessor processor = new CsvProcessor(randomAlphaOfLength(5), fieldName, headers, trim, separator, quoteChar, false);
processor.execute(ingestDocument);
return ingestDocument;
}
}

View File

@ -0,0 +1,164 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404
---
"Test CSV Processor defaults":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"csv": {
"field": "value",
"target_fields":["a","b","c"]
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: >
{
"value": "aa,bb,cc"
}
- do:
get:
index: test
id: 1
- match: { _source.a: "aa" }
- match: { _source.b: "bb" }
- match: { _source.c: "cc" }
---
"Test CSV Processor quote and separator":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"csv": {
"field": "value",
"target_fields":["a","b","c","d","e"],
"quote": "'",
"separator": ";"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: >
{
"value": "'aa';'b;b';'cc';d,d;'ee''ee'"
}
- do:
get:
index: test
id: 1
- match: { _source.a: "aa" }
- match: { _source.b: "b;b" }
- match: { _source.c: "cc" }
- match: { _source.d: "d,d" }
- match: { _source.e: "ee'ee" }
---
"Test CSV Processor trim":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"csv": {
"field": "value",
"target_fields":["a","b","c"],
"trim": true,
"quote": "'"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: >
{
"value": " aa, bb , 'cc'"
}
- do:
get:
index: test
id: 1
- match: { _source.a: "aa" }
- match: { _source.b: "bb" }
- match: { _source.c: "cc" }
---
"Test CSV Processor trim log":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"csv": {
"field": "value",
"target_fields":["date","level","server","id","msg"],
"trim": true,
"separator": "|"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: >
{
"value": "2018-01-06 16:56:14.295748|INFO |VirtualServer |1 |listening on 0.0.0.0:9987, :::9987"
}
- do:
get:
index: test
id: 1
- match: { _source.date: "2018-01-06 16:56:14.295748" }
- match: { _source.level: "INFO" }
- match: { _source.server: "VirtualServer" }
- match: { _source.id: "1" }
- match: { _source.msg: "listening on 0.0.0.0:9987, :::9987" }