From b6ef7e13bf076fb88fd94ce49d2a217db3f19aaa Mon Sep 17 00:00:00 2001 From: pcgrenier Date: Fri, 15 May 2020 20:03:01 -0400 Subject: [PATCH] NIFI-7462: This adds a way to convert or cast a choice object into a valid type for use with calcite query functions NIFI-7462: Update to allow FlowFile Table's schema to be more intelligent when using CHOICE types NIFI-7462: Fixed checkstyle violation, removed documentation around the CAST functions that were no longer needed Signed-off-by: Matthew Burgess This closes #4282 --- .../nifi/processors/standard/QueryRecord.java | 26 ++-- .../nifi/queryrecord/FlowFileTable.java | 58 ++++++++ .../processors/standard/TestQueryRecord.java | 129 ++++++++++++++++++ 3 files changed, 199 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 82aea6fbc1..a620a60030 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -784,12 +784,6 @@ public class QueryRecord extends AbstractProcessor { } } - public static class RecordRecordPath extends RecordPathFunction { - public Record eval(Object record, String recordPath) { - return eval(record, recordPath, Record.class::cast); - } - } - public static class RecordPathFunction { private static final RecordField ROOT_RECORD_FIELD = new RecordField("root", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType())); @@ -803,14 +797,18 @@ public class QueryRecord extends AbstractProcessor { return null; } - if (record instanceof Record) { - return eval((Record) record, recordPath, transform); - } else if (record instanceof Record[]) { - return eval((Record[]) record, recordPath, transform); - } else if (record instanceof Iterable) { - return eval((Iterable) record, recordPath, transform); - } else if (record instanceof Map) { - return eval((Map) record, recordPath, transform); + try { + if (record instanceof Record) { + return eval((Record) record, recordPath, transform); + } else if (record instanceof Record[]) { + return eval((Record[]) record, recordPath, transform); + } else if (record instanceof Iterable) { + return eval((Iterable) record, recordPath, transform); + } else if (record instanceof Map) { + return eval((Map) record, recordPath, transform); + } + } catch (IllegalArgumentException e) { + throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against " + record, e); } throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against given argument because the argument is of type " + record.getClass() + " instead of Record"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java index 30300086ca..18cbc63329 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java @@ -43,6 +43,7 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; import java.lang.reflect.Type; import java.math.BigInteger; @@ -223,12 +224,69 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran case BIGINT: return typeFactory.createJavaType(BigInteger.class); case CHOICE: + final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType; + DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0); + for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) { + if (possibleType == widestDataType) { + continue; + } + if (possibleType.getFieldType().isWiderThan(widestDataType.getFieldType())) { + widestDataType = possibleType; + continue; + } + if (widestDataType.getFieldType().isWiderThan(possibleType.getFieldType())) { + continue; + } + + // Neither is wider than the other. + widestDataType = null; + break; + } + + // If one of the CHOICE data types is the widest, use it. + if (widestDataType != null) { + return getRelDataType(widestDataType, typeFactory); + } + + // None of the data types is strictly the widest. Check if all data types are numeric. + // This would happen, for instance, if the data type is a choice between float and integer. + // If that is the case, we can use a String type for the table schema because all values will fit + // into a String. This will still allow for casting, etc. if the query requires it. + boolean allNumeric = true; + for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) { + if (!isNumeric(possibleType)) { + allNumeric = false; + break; + } + } + + if (allNumeric) { + return typeFactory.createJavaType(String.class); + } + + // There is no specific type that we can use for the schema. This would happen, for instance, if our + // CHOICE is between an integer and a Record. return typeFactory.createJavaType(Object.class); } throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType); } + private boolean isNumeric(final DataType dataType) { + switch (dataType.getFieldType()) { + case BIGINT: + case BYTE: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + case SHORT: + return true; + default: + return false; + } + } + @Override public TableType getJdbcTableType() { return TableType.TEMPORARY_TABLE; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index 0c269806c9..a2c2b1968a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -41,6 +41,7 @@ import org.junit.Test; import java.io.IOException; import java.io.OutputStream; +import java.math.BigDecimal; import java.sql.SQLException; import java.util.Arrays; import java.util.ArrayList; @@ -248,6 +249,130 @@ public class TestQueryRecord { assertEquals("Software Engineer", output.getValue("title")); } + @Test + public void testCollectionFunctionsWithoutCastFailure() throws InitializationException { + final Record record = createHierarchicalArrayRecord(); + final Record record2 = createHierarchicalArrayRecord(); + record2.setValue("height", 30); + + final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema()); + recordReader.addRecord(record); + recordReader.addRecord(record2); + final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema()); + + TestRunner runner = getRunner(); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader"); + runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); + runner.setProperty(REL_NAME, + "SELECT title, name, sum(height) as height_total " + + "FROM FLOWFILE " + + "GROUP BY title, name"); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + + final List written = writer.getRecordsWritten(); + assertEquals(1, written.size()); + + final Record output = written.get(0); + assertEquals("John Doe", output.getValue("name")); + assertEquals("Software Engineer", output.getValue("title")); + assertEquals(BigDecimal.valueOf(90.5D), output.getValue("height_total")); + } + + @Test + public void testCollectionFunctionsWithCastChoice() throws InitializationException { + final Record record = createHierarchicalArrayRecord(); + + final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema()); + recordReader.addRecord(record); + recordReader.addRecord(record); + + final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema()); + + TestRunner runner = getRunner(); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader"); + runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); + runner.setProperty(REL_NAME, + "SELECT title, name, " + + "sum(CAST(height AS DOUBLE)) as height_total_double, " + + "sum(CAST(height AS REAL)) as height_total_float " + + "FROM FLOWFILE " + + "GROUP BY title, name"); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + + final List written = writer.getRecordsWritten(); + assertEquals(1, written.size()); + + final Number height = 121.0; + final Record output = written.get(0); + assertEquals("John Doe", output.getValue("name")); + assertEquals("Software Engineer", output.getValue("title")); + assertEquals(height.doubleValue(), output.getValue("height_total_double")); + assertEquals(height.floatValue(), output.getValue("height_total_float")); + } + + @Test + public void testCollectionFunctionsWithCastChoiceWithInts() throws InitializationException { + final Record record = createHierarchicalArrayRecord(); + record.setValue("height", 30); + + final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema()); + recordReader.addRecord(record); + recordReader.addRecord(record); + + final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema()); + + TestRunner runner = getRunner(); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader"); + runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); + runner.setProperty(REL_NAME, + "SELECT title, name, " + + "sum(CAST(height AS INT)) as height_total_int, " + + "sum(CAST(height AS BIGINT)) as height_total_long " + + "FROM FLOWFILE " + + "GROUP BY title, name"); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + + final List written = writer.getRecordsWritten(); + assertEquals(1, written.size()); + + final Number height = 60; + final Record output = written.get(0); + assertEquals("John Doe", output.getValue("name")); + assertEquals("Software Engineer", output.getValue("title")); + assertEquals(height.longValue(), output.getValue("height_total_long")); + assertEquals(height.intValue(), output.getValue("height_total_int")); + } + @Test public void testCollectionFunctionsWithWhereClause() throws InitializationException { final Record sample = createTaggedRecord("1", "a", "b", "c"); @@ -534,6 +659,7 @@ public class TestQueryRecord { personFields.add(new RecordField("dobTimestamp", RecordFieldType.LONG.getDataType())); personFields.add(new RecordField("joinTimestamp", RecordFieldType.STRING.getDataType())); personFields.add(new RecordField("weight", RecordFieldType.DOUBLE.getDataType())); + personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.LONG.getDataType(), RecordFieldType.INT.getDataType()))); personFields.add(new RecordField("mother", RecordFieldType.RECORD.getRecordDataType(namedPersonSchema))); final RecordSchema personSchema = new SimpleRecordSchema(personFields); @@ -559,6 +685,7 @@ public class TestQueryRecord { map.put("dobTimestamp", ts); map.put("joinTimestamp", "2018-02-04 10:20:55.802"); map.put("weight", 180.8D); + map.put("height", 60.5); map.put("mother", mother); final Record person = new MapRecord(personSchema, map); @@ -640,6 +767,7 @@ public class TestQueryRecord { personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); personFields.add(new RecordField("age", RecordFieldType.INT.getDataType())); personFields.add(new RecordField("title", RecordFieldType.STRING.getDataType())); + personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.DOUBLE.getDataType(), RecordFieldType.INT.getDataType()))); personFields.add(new RecordField("addresses", RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.RECORD.getRecordDataType(addressSchema)) )); final RecordSchema personSchema = new SimpleRecordSchema(personFields); @@ -666,6 +794,7 @@ public class TestQueryRecord { final Map map = new HashMap<>(); map.put("name", "John Doe"); map.put("age", 30); + map.put("height", 60.5); map.put("title", "Software Engineer"); map.put("addresses", new Record[] {homeAddress, workAddress}); final Record person = new MapRecord(personSchema, map);