NIFI-4792: Add support for querying array fields in QueryRecord

Work in progress adding support for array based queries
updated calcite dependency

tidy up unused imports highlighted by checkstyle in travis build

tidy up }s highlighted by checkstyle in travis build

Add test for use case referenced in NIFI-4792

Bumped Calcite version to 1.21.0

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4015
This commit is contained in:
David Savage 2020-01-25 17:27:47 +00:00 committed by Matthew Burgess
parent 8faea04ff1
commit 5e964fbc47
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
5 changed files with 214 additions and 6 deletions

View File

@ -610,14 +610,15 @@ public class QueryRecord extends AbstractProcessor {
if (record == null) {
return null;
}
if (record instanceof Record) {
return eval((Record) record, recordPath);
}
if (record instanceof Record[]) {
return eval((Record[]) record, recordPath);
}
if (record instanceof Iterable) {
return eval((Iterable<Record>) record, recordPath);
}
if (record instanceof Map) {
return eval((Map<?, ?>) record, recordPath);
}
@ -645,6 +646,18 @@ public class QueryRecord extends AbstractProcessor {
return evalResults(selectedFields);
}
private Object eval(final Iterable<Record> records, final String recordPath) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final List<FieldValue> selectedFields = new ArrayList<>();
for (final Record record : records) {
final RecordPathResult result = compiled.evaluate(record);
result.getSelectedFields().forEach(selectedFields::add);
}
return evalResults(selectedFields);
}
private Object eval(final Record[] records, final String recordPath) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
@ -794,6 +807,8 @@ public class QueryRecord extends AbstractProcessor {
return eval((Record) record, recordPath, transform);
} else if (record instanceof Record[]) {
return eval((Record[]) record, recordPath, transform);
} else if (record instanceof Iterable) {
return eval((Iterable<Record>) record, recordPath, transform);
} else if (record instanceof Map) {
return eval((Map<?, ?>) record, recordPath, transform);
}
@ -837,6 +852,23 @@ public class QueryRecord extends AbstractProcessor {
return evalResults(selectedFields.stream(), transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained.");
}
private <T> T eval(final Iterable<Record> records, final String recordPath, final Function<Object, T> transform) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final List<FieldValue> selectedFields = new ArrayList<>();
for (final Record record : records) {
final RecordPathResult result = compiled.evaluate(record);
result.getSelectedFields().forEach(selectedFields::add);
}
if (selectedFields.isEmpty()) {
return null;
}
return evalResults(selectedFields.stream(), transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained.");
}
private <T> T evalResults(final Stream<FieldValue> fields, final Function<Object, T> transform, final Supplier<String> multipleReturnValueErrorSupplier) {
return fields.map(FieldValue::getValue)
.filter(Objects::nonNull)

View File

@ -27,6 +27,9 @@ import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import java.io.InputStream;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
public class FlowFileEnumerator implements Enumerator<Object> {
private final ProcessSession session;
@ -111,12 +114,26 @@ public class FlowFileEnumerator implements Enumerator<Object> {
final Object[] filtered = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
final int indexToKeep = fields[i];
filtered[i] = row[indexToKeep];
filtered[i] = cast(row[indexToKeep]);
}
return filtered;
}
private Object cast(Object o) {
if (o == null) {
return null;
} else if (o.getClass().isArray()) {
List<Object> l = new ArrayList(Array.getLength(o));
for (int i = 0; i < Array.getLength(o); i++) {
l.add(Array.get(o, i));
}
return l;
} else {
return o;
}
}
@Override
public void reset() {
if (rawIn != null) {

View File

@ -42,6 +42,7 @@ import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import java.lang.reflect.Type;
import java.math.BigInteger;
@ -51,7 +52,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class FlowFileTable extends AbstractTable implements QueryableTable, TranslatableTable {
private final RecordReaderFactory recordReaderFactory;
@ -214,7 +214,8 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran
case STRING:
return typeFactory.createJavaType(String.class);
case ARRAY:
return typeFactory.createJavaType(Object[].class);
ArrayDataType array = (ArrayDataType) fieldType;
return typeFactory.createArrayType(getRelDataType(array.getElementType(), typeFactory), -1);
case RECORD:
return typeFactory.createJavaType(Record.class);
case MAP:

View File

@ -42,6 +42,7 @@ import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@ -212,6 +213,132 @@ public class TestQueryRecord {
assertEquals("Software Engineer", output.getValue("title"));
}
@Test
public void testCollectionFunctionsWithArray() throws InitializationException {
final Record record = createHierarchicalArrayRecord();
final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
recordReader.addRecord(record);
final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
TestRunner runner = getRunner();
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(REL_NAME,
"SELECT title, name" +
" FROM FLOWFILE" +
" WHERE CARDINALITY(addresses) > 1");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
final Record output = written.get(0);
assertEquals("John Doe", output.getValue("name"));
assertEquals("Software Engineer", output.getValue("title"));
}
@Test
public void testCollectionFunctionsWithWhereClause() throws InitializationException {
final Record sample = createTaggedRecord("1", "a", "b", "c");
final ArrayListRecordReader recordReader = new ArrayListRecordReader(sample.getSchema());
recordReader.addRecord(createTaggedRecord("1", "a", "d", "g"));
recordReader.addRecord(createTaggedRecord("2", "b", "e"));
recordReader.addRecord(createTaggedRecord("3", "c", "f", "h"));
final ArrayListRecordWriter writer = new ArrayListRecordWriter(sample.getSchema());
TestRunner runner = getRunner();
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(REL_NAME,
"SELECT id, tags FROM FLOWFILE CROSS JOIN UNNEST(FLOWFILE.tags) AS f(tag) WHERE tag IN ('a','b')");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final List<Record> written = writer.getRecordsWritten();
assertEquals(2, written.size());
final Record output0 = written.get(0);
assertEquals("1", output0.getValue("id"));
assertArrayEquals(new Object[]{"a", "d", "g"}, (Object[]) output0.getValue("tags"));
final Record output1 = written.get(1);
assertEquals("2", output1.getValue("id"));
assertArrayEquals(new Object[]{"b", "e"}, (Object[]) output1.getValue("tags"));
}
@Test
public void testArrayColumnWithIndex() throws InitializationException {
final Record sample = createTaggedRecord("1", "a", "b", "c");
final ArrayListRecordReader recordReader = new ArrayListRecordReader(sample.getSchema());
recordReader.addRecord(createTaggedRecord("1", "a", "d", "g"));
recordReader.addRecord(createTaggedRecord("2", "b", "e"));
recordReader.addRecord(createTaggedRecord("3", "c", "f", "h"));
final ArrayListRecordWriter writer = new ArrayListRecordWriter(sample.getSchema());
TestRunner runner = getRunner();
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(REL_NAME,
"SELECT id, tags[1] as first, tags[2] as \"second\", tags[CARDINALITY(tags)] as last FROM FLOWFILE");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final List<Record> written = writer.getRecordsWritten();
assertEquals(3, written.size());
final Record output0 = written.get(0);
assertEquals("1", output0.getValue("id"));
assertEquals("a", output0.getValue("first"));
assertEquals("d", output0.getValue("second"));
assertEquals("g", output0.getValue("last"));
final Record output1 = written.get(1);
assertEquals("2", output1.getValue("id"));
assertEquals("b", output1.getValue("first"));
assertEquals("e", output1.getValue("second"));
assertEquals("e", output1.getValue("last"));
final Record output2 = written.get(2);
assertEquals("3", output2.getValue("id"));
assertEquals("c", output2.getValue("first"));
assertEquals("f", output2.getValue("second"));
assertEquals("h", output2.getValue("last"));
}
@Test
public void testCompareResultsOfTwoRecordPathsAgainstArray() throws InitializationException {
final Record record = createHierarchicalArrayRecord();
@ -382,6 +509,12 @@ public class TestQueryRecord {
* "mother": {
* "name": "Jane Doe"
* }
* },
* "favouriteThings": {
* "sport": "basketball",
* "color": "green",
* "roses": "raindrops",
* "kittens": "whiskers"
* }
* }
* </pre></code>
@ -438,6 +571,31 @@ public class TestQueryRecord {
}
/**
* Returns a Record that, if written in JSON, would look like:
* <code><pre>
* {
* "id": &gt;id&lt;,
* "tags": [&gt;tag1&lt;,&gt;tag2&lt;...]
* }
* </pre></code>
*
* @return the Record
*/
private Record createTaggedRecord(String id, String...tags) {
final List<RecordField> recordSchemaFields = new ArrayList<>();
recordSchemaFields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
recordSchemaFields.add(new RecordField("tags", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
final RecordSchema recordSchema = new SimpleRecordSchema(recordSchemaFields);
final Map<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("tags", Arrays.asList(tags));
final Record record = new MapRecord(recordSchema, map);
return record;
}
/**
* Returns a Record that, if written in JSON, would look like:
* <code><pre>

View File

@ -372,7 +372,7 @@
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.17.0</version>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>