NIFI-2624: Avro logical types for ExecuteSQL and QueryDatabaseTable

- Added Logical type support for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP column types.
- Added Logical type 'decimal' to AvroReader so that Avro records with logical types written by ExecuteSQL and QueryDatabaseTable can be consumed by AvroReader.
- Added JdbcCommon.AvroConversionOptions to consolidate conversion options.
- Added 'Use Avro Logical Types' property to ExecuteSQL and QueryDatabaseTable to toggle whether to use Logical types.
- Added 'mime.type' FlowFile attribute as 'application/avro-binary' so that output FlowFiles can be displayed by content viewer.

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1798
This commit is contained in:
Koji Kawamura 2017-01-24 12:16:52 +09:00 committed by Matt Burgess
parent 20a1fc24d7
commit 1811ba5681
10 changed files with 364 additions and 46 deletions

View File

@ -17,7 +17,9 @@
package org.apache.nifi.avro;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
@ -39,6 +41,8 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
@ -61,6 +65,7 @@ public class AvroTypeUtil {
private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
private static final String LOGICAL_TYPE_DECIMAL = "decimal";
public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
if (recordSchema == null) {
@ -107,6 +112,10 @@ public class AvroTypeUtil {
case LOGICAL_TYPE_TIMESTAMP_MILLIS:
case LOGICAL_TYPE_TIMESTAMP_MICROS:
return RecordFieldType.TIMESTAMP.getDataType();
case LOGICAL_TYPE_DECIMAL:
// We convert Decimal to Double.
// Alternatively we could convert it to String, but numeric type is generally more preferable by users.
return RecordFieldType.DOUBLE.getDataType();
}
}
@ -262,6 +271,10 @@ public class AvroTypeUtil {
* Convert a raw value to an Avro object to serialize in Avro type system.
* The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema)}.
*/
public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema) {
return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName());
}
private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) {
if (rawValue == null) {
return null;
@ -311,6 +324,19 @@ public class AvroTypeUtil {
}
case BYTES:
case FIXED:
final LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
final BigDecimal decimal;
if (rawValue instanceof BigDecimal) {
decimal = (BigDecimal) rawValue;
} else if (rawValue instanceof Double) {
decimal = new BigDecimal((Double) rawValue, new MathContext(decimalType.getPrecision()));
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal");
}
return new Conversions.DecimalConversion().toBytes(decimal, fieldSchema, logicalType);
}
if (rawValue instanceof byte[]) {
return ByteBuffer.wrap((byte[]) rawValue);
}
@ -529,6 +555,10 @@ public class AvroTypeUtil {
return new MapRecord(childSchema, values);
case BYTES:
final ByteBuffer bb = (ByteBuffer) value;
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
return new Conversions.DecimalConversion().fromBytes(bb, avroSchema, logicalType);
}
return AvroTypeUtil.convertByteArray(bb.array());
case FIXED:
final GenericFixed fixed = (GenericFixed) value;

View File

@ -174,8 +174,8 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -143,16 +143,6 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
.name("dbf-normalize")
.displayName("Normalize Table/Column Names")
.description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+ "will be changed to underscores in order to build a valid Avro record.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
protected List<PropertyDescriptor> propDescriptors;
// The delimiter to use when referencing qualified names (such as table@!@column in the state map)

View File

@ -42,6 +42,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -54,6 +55,9 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
@EventDriven
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"sql", "select", "jdbc", "query", "database"})
@ -107,16 +111,6 @@ public class ExecuteSQL extends AbstractProcessor {
.sensitive(false)
.build();
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
.name("dbf-normalize")
.displayName("Normalize Table/Column Names")
.description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+ "will be changed to underscores in order to build a valid Avro record.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
private final List<PropertyDescriptor> propDescriptors;
public ExecuteSQL() {
@ -130,6 +124,7 @@ public class ExecuteSQL extends AbstractProcessor {
pds.add(SQL_SELECT_QUERY);
pds.add(QUERY_TIMEOUT);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
pds.add(USE_AVRO_LOGICAL_TYPES);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -172,6 +167,7 @@ public class ExecuteSQL extends AbstractProcessor {
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final StopWatch stopWatch = new StopWatch(true);
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
@ -202,7 +198,10 @@ public class ExecuteSQL extends AbstractProcessor {
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
final ResultSet resultSet = st.executeQuery(selectQuery);
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, convertNamesForAvro));
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro)
.useLogicalTypes(useAvroLogicalTypes).build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);
}
@ -211,6 +210,7 @@ public class ExecuteSQL extends AbstractProcessor {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});

View File

@ -35,6 +35,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -67,6 +68,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
@EventDriven
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -155,6 +159,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(MAX_FRAGMENTS);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
pds.add(USE_AVRO_LOGICAL_TYPES);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -202,7 +207,12 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
: 0;
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.recordName(tableName)
.maxRows(maxRowsPerFlowFile)
.convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean())
.useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean())
.build();
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
@ -284,7 +294,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile, convertNamesForAvro));
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, maxValCollector));
} catch (SQLException | RuntimeException e) {
throw new ProcessException("Error during database query or conversion of records to Avro.", e);
}
@ -299,6 +309,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName);
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
if(maxRowsPerFlowFile > 0) {
fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier);
fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));

View File

@ -55,22 +55,59 @@ import java.sql.Clob;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Date;
import java.util.function.Function;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.BaseTypeBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.avro.SchemaBuilder.NullDefault;
import org.apache.avro.SchemaBuilder.UnionAccumulator;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
/**
* JDBC / SQL common functions.
*/
public class JdbcCommon {
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
.name("dbf-normalize")
.displayName("Normalize Table/Column Names")
.description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+ "will be changed to underscores in order to build a valid Avro record.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
.name("dbf-user-logical-types")
.displayName("Use Avro Logical Types")
.description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. "
+ "If disabled, written as string. "
+ "If enabled, Logical types are used and written as its underlying type, specifically, "
+ "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, "
+ "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), "
+ "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, "
+ "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
+ "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
private static final int MAX_DIGITS_IN_BIGINT = 19;
private static final int MAX_DIGITS_IN_INT = 9;
@ -90,7 +127,69 @@ public class JdbcCommon {
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows, boolean convertNames)
throws SQLException, IOException {
final Schema schema = createSchema(rs, recordName, convertNames);
final AvroConversionOptions options = AvroConversionOptions.builder()
.recordName(recordName)
.maxRows(maxRows)
.convertNames(convertNames)
.useLogicalTypes(false).build();
return convertToAvroStream(rs, outStream, options, callback);
}
public static class AvroConversionOptions {
private final String recordName;
private final int maxRows;
private final boolean convertNames;
private final boolean useLogicalTypes;
private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes) {
this.recordName = recordName;
this.maxRows = maxRows;
this.convertNames = convertNames;
this.useLogicalTypes = useLogicalTypes;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private String recordName;
private int maxRows = 0;
private boolean convertNames = false;
private boolean useLogicalTypes = false;
/**
* Specify a priori record name to use if it cannot be determined from the result set.
*/
public Builder recordName(String recordName) {
this.recordName = recordName;
return this;
}
public Builder maxRows(int maxRows) {
this.maxRows = maxRows;
return this;
}
public Builder convertNames(boolean convertNames) {
this.convertNames = convertNames;
return this;
}
public Builder useLogicalTypes(boolean useLogicalTypes) {
this.useLogicalTypes = useLogicalTypes;
return this;
}
public AvroConversionOptions build() {
return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes);
}
}
}
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final AvroConversionOptions options, final ResultSetRowCallback callback)
throws SQLException, IOException {
final Schema schema = createSchema(rs, options);
final GenericRecord rec = new GenericData.Record(schema);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@ -106,6 +205,7 @@ public class JdbcCommon {
}
for (int i = 1; i <= nrOfColumns; i++) {
final int javaSqlType = meta.getColumnType(i);
final Schema fieldSchema = schema.getFields().get(i - 1).schema();
// Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement
if (javaSqlType == CLOB) {
@ -171,8 +271,13 @@ public class JdbcCommon {
//MS SQL returns TINYINT as a Java Short, which Avro doesn't understand.
rec.put(i - 1, ((Short) value).intValue());
} else if (value instanceof BigDecimal) {
// Avro can't handle BigDecimal as a number - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38"
rec.put(i - 1, value.toString());
if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types.
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
} else {
// As string for backward compatibility.
rec.put(i - 1, value.toString());
}
} else if (value instanceof BigInteger) {
// Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that.
@ -208,6 +313,15 @@ public class JdbcCommon {
rec.put(i - 1, value);
}
} else if (value instanceof Date) {
if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types.
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
} else {
// As string for backward compatibility.
rec.put(i - 1, value.toString());
}
} else {
// The different types that we support are numbers (int, long, double, float),
// as well as boolean values and Strings. Since Avro doesn't provide
@ -219,7 +333,7 @@ public class JdbcCommon {
dataFileWriter.append(rec);
nrOfRows += 1;
if (maxRows > 0 && nrOfRows == maxRows)
if (options.maxRows > 0 && nrOfRows == options.maxRows)
break;
}
@ -231,19 +345,33 @@ public class JdbcCommon {
return createSchema(rs, null, false);
}
public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException {
final AvroConversionOptions options = AvroConversionOptions.builder().recordName(recordName).convertNames(convertNames).build();
return createSchema(rs, options);
}
private static void addNullableField(
FieldAssembler<Schema> builder,
String columnName,
Function<BaseTypeBuilder<UnionAccumulator<NullDefault<Schema>>>, UnionAccumulator<NullDefault<Schema>>> func
) {
final BaseTypeBuilder<UnionAccumulator<NullDefault<Schema>>> and = builder.name(columnName).type().unionOf().nullBuilder().endNull().and();
func.apply(and).endUnion().noDefault();
}
/**
* Creates an Avro schema from a result set. If the table/record name is known a priori and provided, use that as a
* fallback for the record name if it cannot be retrieved from the result set, and finally fall back to a default value.
*
* @param rs The result set to convert to Avro
* @param recordName The a priori record name to use if it cannot be determined from the result set.
* @param rs The result set to convert to Avro
* @param options Specify various options
* @return A Schema object representing the result set converted to an Avro record
* @throws SQLException if any error occurs during conversion
*/
public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException {
public static Schema createSchema(final ResultSet rs, AvroConversionOptions options) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
String tableName = StringUtils.isEmpty(recordName) ? "NiFi_ExecuteSQL_Record" : recordName;
String tableName = StringUtils.isEmpty(options.recordName) ? "NiFi_ExecuteSQL_Record" : options.recordName;
if (nrOfColumns > 0) {
String tableNameFromMeta = meta.getTableName(1);
if (!StringUtils.isBlank(tableNameFromMeta)) {
@ -251,7 +379,7 @@ public class JdbcCommon {
}
}
if (convertNames) {
if (options.convertNames) {
tableName = normalizeNameForAvro(tableName);
}
@ -267,7 +395,7 @@ public class JdbcCommon {
* check for alias. Postgres is the one that has the null column names for calculated fields.
*/
String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i) :meta.getColumnName(i);
String columnName = convertNames ? normalizeNameForAvro(nameOrLabel) : nameOrLabel;
String columnName = options.convertNames ? normalizeNameForAvro(nameOrLabel) : nameOrLabel;
switch (meta.getColumnType(i)) {
case CHAR:
case LONGNVARCHAR:
@ -324,17 +452,39 @@ public class JdbcCommon {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
break;
// Did not find direct suitable type, need to be clarified!!!!
// Since Avro 1.8, LogicalType is supported.
case DECIMAL:
case NUMERIC:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
final LogicalTypes.Decimal decimal = options.useLogicalTypes
? LogicalTypes.decimal(meta.getPrecision(i), meta.getScale(i)) : null;
addNullableField(builder, columnName,
u -> options.useLogicalTypes
? u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType()))
: u.stringType());
break;
// Did not find direct suitable type, need to be clarified!!!!
case DATE:
addNullableField(builder, columnName,
u -> options.useLogicalTypes
? u.type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()))
: u.stringType());
break;
case TIME:
addNullableField(builder, columnName,
u -> options.useLogicalTypes
? u.type(LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()))
: u.stringType());
break;
case TIMESTAMP:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
addNullableField(builder, columnName,
u -> options.useLogicalTypes
? u.type(LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType()))
: u.stringType());
break;
case BINARY:
@ -371,4 +521,5 @@ public class JdbcCommon {
public interface ResultSetRowCallback {
void processRow(ResultSet resultSet) throws IOException;
}
}

View File

@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -39,17 +40,28 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashSet;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.IntStream;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.util.Utf8;
import org.apache.commons.io.input.ReaderInputStream;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -350,11 +362,15 @@ public class TestJdbcCommon {
@Test
public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException {
final BigDecimal bigDecimal = new BigDecimal(38D);
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC);
when(metadata.getColumnName(1)).thenReturn("The.Chairman");
when(metadata.getTableName(1)).thenReturn("1the::table");
when(metadata.getPrecision(1)).thenReturn(bigDecimal.precision());
when(metadata.getScale(1)).thenReturn(bigDecimal.scale());
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
@ -367,24 +383,28 @@ public class TestJdbcCommon {
}
}).when(rs).next();
final BigDecimal bigDecimal = new BigDecimal(38D);
when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(rs, baos, true);
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions
.builder().convertNames(true).useLogicalTypes(true).build();
JdbcCommon.convertToAvroStream(rs, baos, options, null);
final byte[] serializedBytes = baos.toByteArray();
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(null, null, genericData);
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
assertEquals("_1the__table", record.getSchema().getName());
assertEquals(bigDecimal.toString(), record.get("The_Chairman").toString());
assertEquals(bigDecimal, record.get("The_Chairman"));
}
}
}
@ -526,6 +546,100 @@ public class TestJdbcCommon {
}
}
@Test
public void testConvertToAvroStreamForDateTimeAsString() throws SQLException, IOException, ParseException {
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions
.builder().convertNames(true).useLogicalTypes(false).build();
testConvertToAvroStreamForDateTime(options,
(record, date) -> assertEquals(new Utf8(date.toString()), record.get("date")),
(record, time) -> assertEquals(new Utf8(time.toString()), record.get("time")),
(record, timestamp) -> assertEquals(new Utf8(timestamp.toString()), record.get("timestamp"))
);
}
@Test
public void testConvertToAvroStreamForDateTimeAsLogicalType() throws SQLException, IOException, ParseException {
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions
.builder().convertNames(true).useLogicalTypes(true).build();
testConvertToAvroStreamForDateTime(options,
(record, date) -> {
final int daysSinceEpoch = (int) record.get("date");
final long millisSinceEpoch = TimeUnit.MILLISECONDS.convert(daysSinceEpoch, TimeUnit.DAYS);
assertEquals(date, new java.sql.Date(millisSinceEpoch));
},
(record, time) -> assertEquals(time, new Time((int) record.get("time"))),
(record, timestamp) -> assertEquals(timestamp, new Timestamp((long) record.get("timestamp")))
);
}
private void testConvertToAvroStreamForDateTime(
JdbcCommon.AvroConversionOptions options, BiConsumer<GenericRecord, java.sql.Date> assertDate,
BiConsumer<GenericRecord, Time> assertTime, BiConsumer<GenericRecord, Timestamp> assertTimeStamp)
throws SQLException, IOException, ParseException {
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
BiFunction<String, String, Long> toMillis = (format, dateStr) -> {
try {
final SimpleDateFormat dateFormat = new SimpleDateFormat(format);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return dateFormat.parse(dateStr).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
};
when(metadata.getColumnCount()).thenReturn(3);
when(metadata.getTableName(anyInt())).thenReturn("table");
when(metadata.getColumnType(1)).thenReturn(Types.DATE);
when(metadata.getColumnName(1)).thenReturn("date");
final java.sql.Date date = new java.sql.Date(toMillis.apply("yyyy/MM/dd", "2017/05/10"));
when(rs.getObject(1)).thenReturn(date);
when(metadata.getColumnType(2)).thenReturn(Types.TIME);
when(metadata.getColumnName(2)).thenReturn("time");
final Time time = new Time(toMillis.apply("HH:mm:ss.SSS", "12:34:56.789"));
when(rs.getObject(2)).thenReturn(time);
when(metadata.getColumnType(3)).thenReturn(Types.TIMESTAMP);
when(metadata.getColumnName(3)).thenReturn("timestamp");
final Timestamp timestamp = new Timestamp(toMillis.apply("yyyy/MM/dd HH:mm:ss.SSS", "2017/05/11 19:59:39.123"));
when(rs.getObject(3)).thenReturn(timestamp);
final AtomicInteger counter = new AtomicInteger(1);
Mockito.doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
return counter.getAndDecrement() > 0;
}
}).when(rs).next();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(rs, baos, options, null);
final byte[] serializedBytes = baos.toByteArray();
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
assertDate.accept(record, date);
assertTime.accept(record, time);
assertTimeStamp.accept(record, timestamp);
}
}
}
// many test use Derby as database, so ensure driver is available
@Test
public void testDriverLoad() throws ClassNotFoundException {

View File

@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
@ -82,6 +83,7 @@ public class TestAvroReaderWithEmbeddedSchema {
final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60);
final long millisSinceMidnight = secondsSinceMidnight * 1000L;
final BigDecimal bigDecimal = new BigDecimal("123.45");
final byte[] serialized;
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@ -94,6 +96,7 @@ public class TestAvroReaderWithEmbeddedSchema {
record.put("timestampMillis", timeLong);
record.put("timestampMicros", timeLong * 1000L);
record.put("date", 17260);
record.put("decimal", ByteBuffer.wrap(bigDecimal.unscaledValue().toByteArray()));
writer.append(record);
writer.flush();
@ -110,6 +113,7 @@ public class TestAvroReaderWithEmbeddedSchema {
assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMillis").get().getFieldType());
assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMicros").get().getFieldType());
assertEquals(RecordFieldType.DATE, recordSchema.getDataType("date").get().getFieldType());
assertEquals(RecordFieldType.DOUBLE, recordSchema.getDataType("decimal").get().getFieldType());
final Record record = reader.nextRecord();
assertEquals(new java.sql.Time(millisSinceMidnight), record.getValue("timeMillis"));
@ -119,6 +123,7 @@ public class TestAvroReaderWithEmbeddedSchema {
final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
assertEquals(noTimeOfDayDateFormat.format(new java.sql.Date(timeLong)), noTimeOfDayDateFormat.format(record.getValue("date")));
assertEquals(bigDecimal.doubleValue(), record.getValue("decimal"));
}
}

View File

@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
@ -40,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Array;
import org.apache.avro.generic.GenericRecord;
@ -74,9 +76,11 @@ public abstract class TestWriteAvroResult {
fields.add(new RecordField("timestampMillis", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("timestampMicros", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
// Avro decimal is represented as double in NiFi type system.
fields.add(new RecordField("decimal", RecordFieldType.DOUBLE.getDataType()));
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final String expectedTime = "2017-04-04 14:20:33.000";
final String expectedTime = "2017-04-04 14:20:33.789";
final DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("gmt"));
final long timeLong = df.parse(expectedTime).getTime();
@ -87,6 +91,9 @@ public abstract class TestWriteAvroResult {
values.put("timestampMillis", new Timestamp(timeLong));
values.put("timestampMicros", new Timestamp(timeLong));
values.put("date", new Date(timeLong));
// Avro decimal is represented as double in NiFi type system.
final BigDecimal expectedDecimal = new BigDecimal("123.45");
values.put("decimal", expectedDecimal.doubleValue());
final Record record = new MapRecord(recordSchema, values);
final byte[] data;
@ -98,17 +105,20 @@ public abstract class TestWriteAvroResult {
try (final InputStream in = new ByteArrayInputStream(data)) {
final GenericRecord avroRecord = readRecord(in, schema);
final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60);
final long millisSinceMidnight = secondsSinceMidnight * 1000L;
final long millisSinceMidnight = (secondsSinceMidnight * 1000L) + 789;
assertEquals((int) millisSinceMidnight, avroRecord.get("timeMillis"));
assertEquals(millisSinceMidnight * 1000L, avroRecord.get("timeMicros"));
assertEquals(timeLong, avroRecord.get("timestampMillis"));
assertEquals(timeLong * 1000L, avroRecord.get("timestampMicros"));
assertEquals(17260, avroRecord.get("date"));
// Double value will be converted into logical decimal if Avro schema is defined as logical decimal.
final Schema decimalSchema = schema.getField("decimal").schema();
final BigDecimal decimal = new Conversions.DecimalConversion().fromBytes((ByteBuffer) avroRecord.get("decimal"), decimalSchema, decimalSchema.getLogicalType());
assertEquals(expectedDecimal, decimal);
}
}
@Test
public void testDataTypes() throws IOException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/datatypes.avsc"));

View File

@ -29,6 +29,13 @@
"type" : "int",
"logicalType" : "date"
}
}, {
"name" : "decimal", "type": {
"type" : "bytes",
"logicalType" : "decimal",
"precision" : 5,
"scale" : 2
}
}
]
}