diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 5136e67d97..82aea6fbc1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -610,14 +610,15 @@ public class QueryRecord extends AbstractProcessor { if (record == null) { return null; } - if (record instanceof Record) { return eval((Record) record, recordPath); } if (record instanceof Record[]) { return eval((Record[]) record, recordPath); } - + if (record instanceof Iterable) { + return eval((Iterable) record, recordPath); + } if (record instanceof Map) { return eval((Map) record, recordPath); } @@ -645,6 +646,18 @@ public class QueryRecord extends AbstractProcessor { return evalResults(selectedFields); } + private Object eval(final Iterable records, final String recordPath) { + final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath); + + final List selectedFields = new ArrayList<>(); + for (final Record record : records) { + final RecordPathResult result = compiled.evaluate(record); + result.getSelectedFields().forEach(selectedFields::add); + } + + return evalResults(selectedFields); + } + private Object eval(final Record[] records, final String recordPath) { final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath); @@ -794,6 +807,8 @@ public class QueryRecord extends AbstractProcessor { return eval((Record) record, recordPath, transform); } else if (record instanceof Record[]) { return eval((Record[]) record, recordPath, transform); + } else if (record instanceof Iterable) { + return eval((Iterable) record, recordPath, transform); } else if (record instanceof Map) { return eval((Map) record, recordPath, transform); } @@ -837,6 +852,23 @@ public class QueryRecord extends AbstractProcessor { return evalResults(selectedFields.stream(), transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained."); } + private T eval(final Iterable records, final String recordPath, final Function transform) { + final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath); + + final List selectedFields = new ArrayList<>(); + for (final Record record : records) { + final RecordPathResult result = compiled.evaluate(record); + result.getSelectedFields().forEach(selectedFields::add); + } + + if (selectedFields.isEmpty()) { + return null; + } + + return evalResults(selectedFields.stream(), transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained."); + } + + private T evalResults(final Stream fields, final Function transform, final Supplier multipleReturnValueErrorSupplier) { return fields.map(FieldValue::getValue) .filter(Objects::nonNull) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java index 06ffb765c5..e4814ec554 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java @@ -27,6 +27,9 @@ import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; import java.io.InputStream; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; public class FlowFileEnumerator implements Enumerator { private final ProcessSession session; @@ -111,12 +114,26 @@ public class FlowFileEnumerator implements Enumerator { final Object[] filtered = new Object[fields.length]; for (int i = 0; i < fields.length; i++) { final int indexToKeep = fields[i]; - filtered[i] = row[indexToKeep]; + filtered[i] = cast(row[indexToKeep]); } return filtered; } + private Object cast(Object o) { + if (o == null) { + return null; + } else if (o.getClass().isArray()) { + List l = new ArrayList(Array.getLength(o)); + for (int i = 0; i < Array.getLength(o); i++) { + l.add(Array.get(o, i)); + } + return l; + } else { + return o; + } + } + @Override public void reset() { if (rawIn != null) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java index 8c0e2ce8c6..30300086ca 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java @@ -42,6 +42,7 @@ import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; import java.lang.reflect.Type; import java.math.BigInteger; @@ -51,7 +52,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; - public class FlowFileTable extends AbstractTable implements QueryableTable, TranslatableTable { private final RecordReaderFactory recordReaderFactory; @@ -214,7 +214,8 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran case STRING: return typeFactory.createJavaType(String.class); case ARRAY: - return typeFactory.createJavaType(Object[].class); + ArrayDataType array = (ArrayDataType) fieldType; + return typeFactory.createArrayType(getRelDataType(array.getElementType(), typeFactory), -1); case RECORD: return typeFactory.createJavaType(Record.class); case MAP: diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index 5b1732768e..0c269806c9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -42,6 +42,7 @@ import org.junit.Test; import java.io.IOException; import java.io.OutputStream; import java.sql.SQLException; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -212,6 +213,132 @@ public class TestQueryRecord { assertEquals("Software Engineer", output.getValue("title")); } + @Test + public void testCollectionFunctionsWithArray() throws InitializationException { + final Record record = createHierarchicalArrayRecord(); + final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema()); + recordReader.addRecord(record); + + final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema()); + + TestRunner runner = getRunner(); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader"); + runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); + runner.setProperty(REL_NAME, + "SELECT title, name" + + " FROM FLOWFILE" + + " WHERE CARDINALITY(addresses) > 1"); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + + final List written = writer.getRecordsWritten(); + assertEquals(1, written.size()); + + final Record output = written.get(0); + assertEquals("John Doe", output.getValue("name")); + assertEquals("Software Engineer", output.getValue("title")); + } + + @Test + public void testCollectionFunctionsWithWhereClause() throws InitializationException { + final Record sample = createTaggedRecord("1", "a", "b", "c"); + + final ArrayListRecordReader recordReader = new ArrayListRecordReader(sample.getSchema()); + recordReader.addRecord(createTaggedRecord("1", "a", "d", "g")); + recordReader.addRecord(createTaggedRecord("2", "b", "e")); + recordReader.addRecord(createTaggedRecord("3", "c", "f", "h")); + + + final ArrayListRecordWriter writer = new ArrayListRecordWriter(sample.getSchema()); + + TestRunner runner = getRunner(); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader"); + runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); + runner.setProperty(REL_NAME, + "SELECT id, tags FROM FLOWFILE CROSS JOIN UNNEST(FLOWFILE.tags) AS f(tag) WHERE tag IN ('a','b')"); + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + + final List written = writer.getRecordsWritten(); + assertEquals(2, written.size()); + + final Record output0 = written.get(0); + assertEquals("1", output0.getValue("id")); + assertArrayEquals(new Object[]{"a", "d", "g"}, (Object[]) output0.getValue("tags")); + + final Record output1 = written.get(1); + assertEquals("2", output1.getValue("id")); + assertArrayEquals(new Object[]{"b", "e"}, (Object[]) output1.getValue("tags")); + } + + + @Test + public void testArrayColumnWithIndex() throws InitializationException { + final Record sample = createTaggedRecord("1", "a", "b", "c"); + + final ArrayListRecordReader recordReader = new ArrayListRecordReader(sample.getSchema()); + recordReader.addRecord(createTaggedRecord("1", "a", "d", "g")); + recordReader.addRecord(createTaggedRecord("2", "b", "e")); + recordReader.addRecord(createTaggedRecord("3", "c", "f", "h")); + + + final ArrayListRecordWriter writer = new ArrayListRecordWriter(sample.getSchema()); + + TestRunner runner = getRunner(); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader"); + runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); + runner.setProperty(REL_NAME, + "SELECT id, tags[1] as first, tags[2] as \"second\", tags[CARDINALITY(tags)] as last FROM FLOWFILE"); + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + + final List written = writer.getRecordsWritten(); + assertEquals(3, written.size()); + + final Record output0 = written.get(0); + assertEquals("1", output0.getValue("id")); + assertEquals("a", output0.getValue("first")); + assertEquals("d", output0.getValue("second")); + assertEquals("g", output0.getValue("last")); + + final Record output1 = written.get(1); + assertEquals("2", output1.getValue("id")); + assertEquals("b", output1.getValue("first")); + assertEquals("e", output1.getValue("second")); + assertEquals("e", output1.getValue("last")); + + final Record output2 = written.get(2); + assertEquals("3", output2.getValue("id")); + assertEquals("c", output2.getValue("first")); + assertEquals("f", output2.getValue("second")); + assertEquals("h", output2.getValue("last")); + } + @Test public void testCompareResultsOfTwoRecordPathsAgainstArray() throws InitializationException { final Record record = createHierarchicalArrayRecord(); @@ -382,6 +509,12 @@ public class TestQueryRecord { * "mother": { * "name": "Jane Doe" * } + * }, + * "favouriteThings": { + * "sport": "basketball", + * "color": "green", + * "roses": "raindrops", + * "kittens": "whiskers" * } * } * @@ -438,6 +571,31 @@ public class TestQueryRecord { } + /** + * Returns a Record that, if written in JSON, would look like: + *
+     * {
+     *    "id": >id<,
+     *    "tags": [>tag1<,>tag2<...]
+     * }
+     * 
+ * + * @return the Record + */ + private Record createTaggedRecord(String id, String...tags) { + final List recordSchemaFields = new ArrayList<>(); + recordSchemaFields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + recordSchemaFields.add(new RecordField("tags", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))); + final RecordSchema recordSchema = new SimpleRecordSchema(recordSchemaFields); + + final Map map = new HashMap<>(); + map.put("id", id); + map.put("tags", Arrays.asList(tags)); + + final Record record = new MapRecord(recordSchema, map); + return record; + } + /** * Returns a Record that, if written in JSON, would look like: *
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index c110d606c6..f87d71c191 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -372,7 +372,7 @@
             
                 org.apache.calcite
                 calcite-core
-                1.17.0
+                1.21.0
             
             
                 org.apache.avro