NIFI-7462: This adds a way to convert or cast a choice object into a valid type for use with calcite query functions

NIFI-7462: Update to allow FlowFile Table's schema to be more intelligent when using CHOICE types

NIFI-7462: Fixed checkstyle violation, removed documentation around the CAST functions that were no longer needed

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4282
This commit is contained in:
pcgrenier 2020-05-15 20:03:01 -04:00 committed by Matthew Burgess
parent c7edcd68e1
commit b6ef7e13bf
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 199 additions and 14 deletions

View File

@ -784,12 +784,6 @@ public class QueryRecord extends AbstractProcessor {
}
}
public static class RecordRecordPath extends RecordPathFunction {
public Record eval(Object record, String recordPath) {
return eval(record, recordPath, Record.class::cast);
}
}
public static class RecordPathFunction {
private static final RecordField ROOT_RECORD_FIELD = new RecordField("root", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
@ -803,14 +797,18 @@ public class QueryRecord extends AbstractProcessor {
return null;
}
if (record instanceof Record) {
return eval((Record) record, recordPath, transform);
} else if (record instanceof Record[]) {
return eval((Record[]) record, recordPath, transform);
} else if (record instanceof Iterable) {
return eval((Iterable<Record>) record, recordPath, transform);
} else if (record instanceof Map) {
return eval((Map<?, ?>) record, recordPath, transform);
try {
if (record instanceof Record) {
return eval((Record) record, recordPath, transform);
} else if (record instanceof Record[]) {
return eval((Record[]) record, recordPath, transform);
} else if (record instanceof Iterable) {
return eval((Iterable<Record>) record, recordPath, transform);
} else if (record instanceof Map) {
return eval((Map<?, ?>) record, recordPath, transform);
}
} catch (IllegalArgumentException e) {
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against " + record, e);
}
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against given argument because the argument is of type " + record.getClass() + " instead of Record");

View File

@ -43,6 +43,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import java.lang.reflect.Type;
import java.math.BigInteger;
@ -223,12 +224,69 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran
case BIGINT:
return typeFactory.createJavaType(BigInteger.class);
case CHOICE:
final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType;
DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0);
for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
if (possibleType == widestDataType) {
continue;
}
if (possibleType.getFieldType().isWiderThan(widestDataType.getFieldType())) {
widestDataType = possibleType;
continue;
}
if (widestDataType.getFieldType().isWiderThan(possibleType.getFieldType())) {
continue;
}
// Neither is wider than the other.
widestDataType = null;
break;
}
// If one of the CHOICE data types is the widest, use it.
if (widestDataType != null) {
return getRelDataType(widestDataType, typeFactory);
}
// None of the data types is strictly the widest. Check if all data types are numeric.
// This would happen, for instance, if the data type is a choice between float and integer.
// If that is the case, we can use a String type for the table schema because all values will fit
// into a String. This will still allow for casting, etc. if the query requires it.
boolean allNumeric = true;
for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
if (!isNumeric(possibleType)) {
allNumeric = false;
break;
}
}
if (allNumeric) {
return typeFactory.createJavaType(String.class);
}
// There is no specific type that we can use for the schema. This would happen, for instance, if our
// CHOICE is between an integer and a Record.
return typeFactory.createJavaType(Object.class);
}
throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType);
}
private boolean isNumeric(final DataType dataType) {
switch (dataType.getFieldType()) {
case BIGINT:
case BYTE:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case SHORT:
return true;
default:
return false;
}
}
@Override
public TableType getJdbcTableType() {
return TableType.TEMPORARY_TABLE;

View File

@ -41,6 +41,7 @@ import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.ArrayList;
@ -248,6 +249,130 @@ public class TestQueryRecord {
assertEquals("Software Engineer", output.getValue("title"));
}
@Test
public void testCollectionFunctionsWithoutCastFailure() throws InitializationException {
final Record record = createHierarchicalArrayRecord();
final Record record2 = createHierarchicalArrayRecord();
record2.setValue("height", 30);
final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
recordReader.addRecord(record);
recordReader.addRecord(record2);
final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
TestRunner runner = getRunner();
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(REL_NAME,
"SELECT title, name, sum(height) as height_total " +
"FROM FLOWFILE " +
"GROUP BY title, name");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
final Record output = written.get(0);
assertEquals("John Doe", output.getValue("name"));
assertEquals("Software Engineer", output.getValue("title"));
assertEquals(BigDecimal.valueOf(90.5D), output.getValue("height_total"));
}
@Test
public void testCollectionFunctionsWithCastChoice() throws InitializationException {
final Record record = createHierarchicalArrayRecord();
final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
recordReader.addRecord(record);
recordReader.addRecord(record);
final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
TestRunner runner = getRunner();
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(REL_NAME,
"SELECT title, name, " +
"sum(CAST(height AS DOUBLE)) as height_total_double, " +
"sum(CAST(height AS REAL)) as height_total_float " +
"FROM FLOWFILE " +
"GROUP BY title, name");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
final Number height = 121.0;
final Record output = written.get(0);
assertEquals("John Doe", output.getValue("name"));
assertEquals("Software Engineer", output.getValue("title"));
assertEquals(height.doubleValue(), output.getValue("height_total_double"));
assertEquals(height.floatValue(), output.getValue("height_total_float"));
}
@Test
public void testCollectionFunctionsWithCastChoiceWithInts() throws InitializationException {
final Record record = createHierarchicalArrayRecord();
record.setValue("height", 30);
final ArrayListRecordReader recordReader = new ArrayListRecordReader(record.getSchema());
recordReader.addRecord(record);
recordReader.addRecord(record);
final ArrayListRecordWriter writer = new ArrayListRecordWriter(record.getSchema());
TestRunner runner = getRunner();
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(REL_NAME,
"SELECT title, name, " +
"sum(CAST(height AS INT)) as height_total_int, " +
"sum(CAST(height AS BIGINT)) as height_total_long " +
"FROM FLOWFILE " +
"GROUP BY title, name");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final List<Record> written = writer.getRecordsWritten();
assertEquals(1, written.size());
final Number height = 60;
final Record output = written.get(0);
assertEquals("John Doe", output.getValue("name"));
assertEquals("Software Engineer", output.getValue("title"));
assertEquals(height.longValue(), output.getValue("height_total_long"));
assertEquals(height.intValue(), output.getValue("height_total_int"));
}
@Test
public void testCollectionFunctionsWithWhereClause() throws InitializationException {
final Record sample = createTaggedRecord("1", "a", "b", "c");
@ -534,6 +659,7 @@ public class TestQueryRecord {
personFields.add(new RecordField("dobTimestamp", RecordFieldType.LONG.getDataType()));
personFields.add(new RecordField("joinTimestamp", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("weight", RecordFieldType.DOUBLE.getDataType()));
personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.LONG.getDataType(), RecordFieldType.INT.getDataType())));
personFields.add(new RecordField("mother", RecordFieldType.RECORD.getRecordDataType(namedPersonSchema)));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
@ -559,6 +685,7 @@ public class TestQueryRecord {
map.put("dobTimestamp", ts);
map.put("joinTimestamp", "2018-02-04 10:20:55.802");
map.put("weight", 180.8D);
map.put("height", 60.5);
map.put("mother", mother);
final Record person = new MapRecord(personSchema, map);
@ -640,6 +767,7 @@ public class TestQueryRecord {
personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
personFields.add(new RecordField("title", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.DOUBLE.getDataType(), RecordFieldType.INT.getDataType())));
personFields.add(new RecordField("addresses", RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.RECORD.getRecordDataType(addressSchema)) ));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
@ -666,6 +794,7 @@ public class TestQueryRecord {
final Map<String, Object> map = new HashMap<>();
map.put("name", "John Doe");
map.put("age", 30);
map.put("height", 60.5);
map.put("title", "Software Engineer");
map.put("addresses", new Record[] {homeAddress, workAddress});
final Record person = new MapRecord(personSchema, map);