From d32577376021c94a8d713d6f55b22a6aff60b57d Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 7 Sep 2016 13:53:47 -0400 Subject: [PATCH] NIFI-2262: Added Avro-normalization of table/column names in SQL processors This closes #994. --- .../AbstractDatabaseFetchProcessor.java | 10 +++ .../nifi/processors/standard/ExecuteSQL.java | 14 ++++- .../standard/QueryDatabaseTable.java | 4 +- .../processors/standard/util/JdbcCommon.java | 62 ++++++++++++------- .../standard/util/TestJdbcCommon.java | 11 ++-- .../standard/util/TestJdbcHugeStream.java | 2 +- .../standard/util/TestJdbcTypesDerby.java | 2 +- .../standard/util/TestJdbcTypesH2.java | 2 +- 8 files changed, 73 insertions(+), 34 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 962e74effd..eda93287e3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -131,6 +131,16 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); + public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() + .name("dbf-normalize") + .displayName("Normalize Table/Column Names") + .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods " + + "will be changed to underscores in order to build a valid Avro record.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + protected List propDescriptors; public static final PropertyDescriptor DB_TYPE; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 1c514b4616..14283c1204 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -107,6 +107,16 @@ public class ExecuteSQL extends AbstractProcessor { .sensitive(false) .build(); + public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() + .name("dbf-normalize") + .displayName("Normalize Table/Column Names") + .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods " + + "will be changed to underscores in order to build a valid Avro record.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + private final List propDescriptors; public ExecuteSQL() { @@ -119,6 +129,7 @@ public class ExecuteSQL extends AbstractProcessor { pds.add(DBCP_SERVICE); pds.add(SQL_SELECT_QUERY); pds.add(QUERY_TIMEOUT); + pds.add(NORMALIZE_NAMES_FOR_AVRO); propDescriptors = Collections.unmodifiableList(pds); } @@ -160,6 +171,7 @@ public class ExecuteSQL extends AbstractProcessor { final ComponentLog logger = getLogger(); final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); final StopWatch stopWatch = new StopWatch(true); final String selectQuery; if (context.getProperty(SQL_SELECT_QUERY).isSet()) { @@ -190,7 +202,7 @@ public class ExecuteSQL extends AbstractProcessor { try { logger.debug("Executing query {}", new Object[]{selectQuery}); final ResultSet resultSet = st.executeQuery(selectQuery); - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out)); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, convertNamesForAvro)); } catch (final SQLException e) { throw new ProcessException(e); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index ed578543ba..31bec276f0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -134,6 +134,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { pds.add(QUERY_TIMEOUT); pds.add(FETCH_SIZE); pds.add(MAX_ROWS_PER_FLOW_FILE); + pds.add(NORMALIZE_NAMES_FOR_AVRO); propDescriptors = Collections.unmodifiableList(pds); } @@ -178,6 +179,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger(); final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger(); + final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); final Map maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); @@ -248,7 +250,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { // Max values will be updated in the state property map by the callback final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); try { - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile)); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile, convertNamesForAvro)); } catch (SQLException | RuntimeException e) { throw new ProcessException("Error during database query or conversion of records to Avro.", e); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 8d81b34a01..11ba141b1e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -70,22 +70,23 @@ public class JdbcCommon { private static final int MAX_DIGITS_IN_BIGINT = 19; - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { - return convertToAvroStream(rs, outStream, null, null); + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException { + return convertToAvroStream(rs, outStream, null, null, convertNames); } - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName) + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, boolean convertNames) throws SQLException, IOException { - return convertToAvroStream(rs, outStream, recordName, null); + return convertToAvroStream(rs, outStream, recordName, null, convertNames); } - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) throws IOException, SQLException { - return convertToAvroStream(rs, outStream, recordName, callback, 0); + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, boolean convertNames) + throws IOException, SQLException { + return convertToAvroStream(rs, outStream, recordName, callback, 0, convertNames); } - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows) + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows, boolean convertNames) throws SQLException, IOException { - final Schema schema = createSchema(rs, recordName); + final Schema schema = createSchema(rs, recordName, convertNames); final GenericRecord rec = new GenericData.Record(schema); final DatumWriter datumWriter = new GenericDatumWriter<>(schema); @@ -160,7 +161,7 @@ public class JdbcCommon { dataFileWriter.append(rec); nrOfRows += 1; - if(maxRows > 0 && nrOfRows == maxRows) + if (maxRows > 0 && nrOfRows == maxRows) break; } @@ -169,7 +170,7 @@ public class JdbcCommon { } public static Schema createSchema(final ResultSet rs) throws SQLException { - return createSchema(rs, null); + return createSchema(rs, null, false); } /** @@ -181,7 +182,7 @@ public class JdbcCommon { * @return A Schema object representing the result set converted to an Avro record * @throws SQLException if any error occurs during conversion */ - public static Schema createSchema(final ResultSet rs, String recordName) throws SQLException { + public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException { final ResultSetMetaData meta = rs.getMetaData(); final int nrOfColumns = meta.getColumnCount(); String tableName = StringUtils.isEmpty(recordName) ? "NiFi_ExecuteSQL_Record" : recordName; @@ -192,12 +193,17 @@ public class JdbcCommon { } } + if (convertNames) { + tableName = normalizeNameForAvro(tableName); + } + final FieldAssembler builder = SchemaBuilder.record(tableName).namespace("any.data").fields(); /** * Some missing Avro types - Decimal, Date types. May need some additional work. */ for (int i = 1; i <= nrOfColumns; i++) { + String columnName = convertNames ? normalizeNameForAvro(meta.getColumnName(i)) : meta.getColumnName(i); switch (meta.getColumnType(i)) { case CHAR: case LONGNVARCHAR: @@ -205,25 +211,25 @@ public class JdbcCommon { case NCHAR: case NVARCHAR: case VARCHAR: - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); break; case BIT: case BOOLEAN: - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault(); break; case INTEGER: if (meta.isSigned(i)) { - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); } else { - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); } break; case SMALLINT: case TINYINT: - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); break; case BIGINT: @@ -232,38 +238,38 @@ public class JdbcCommon { // to strings as necessary int precision = meta.getPrecision(i); if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) { - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); } else { - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); } break; // java.sql.RowId is interface, is seems to be database // implementation specific, let's convert to String case ROWID: - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); break; case FLOAT: case REAL: - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault(); break; case DOUBLE: - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault(); break; // Did not find direct suitable type, need to be clarified!!!! case DECIMAL: case NUMERIC: - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); break; // Did not find direct suitable type, need to be clarified!!!! case DATE: case TIME: case TIMESTAMP: - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); break; case BINARY: @@ -272,7 +278,7 @@ public class JdbcCommon { case ARRAY: case BLOB: case CLOB: - builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault(); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault(); break; @@ -284,6 +290,14 @@ public class JdbcCommon { return builder.endRecord(); } + public static String normalizeNameForAvro(String inputName) { + String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_"); + if (Character.isDigit(normalizedName.charAt(0))) { + normalizedName = "_" + normalizedName; + } + return normalizedName; + } + /** * An interface for callback methods which allows processing of a row during the convertToAvroStream() processing. * IMPORTANT: This method should only work on the row pointed at by the current ResultSet reference. diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java index 37933a18b1..b66c1782a8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -153,7 +153,7 @@ public class TestJdbcCommon { final ResultSet resultSet = st.executeQuery("select R.*, ROW_NUMBER() OVER () as rownr from restaurants R"); final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - JdbcCommon.convertToAvroStream(resultSet, outStream); + JdbcCommon.convertToAvroStream(resultSet, outStream, false); final byte[] serializedBytes = outStream.toByteArray(); assertNotNull(serializedBytes); @@ -287,8 +287,8 @@ public class TestJdbcCommon { final ResultSetMetaData metadata = mock(ResultSetMetaData.class); when(metadata.getColumnCount()).thenReturn(1); when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC); - when(metadata.getColumnName(1)).thenReturn("Chairman"); - when(metadata.getTableName(1)).thenReturn("table"); + when(metadata.getColumnName(1)).thenReturn("The.Chairman"); + when(metadata.getTableName(1)).thenReturn("1the::table"); final ResultSet rs = mock(ResultSet.class); when(rs.getMetaData()).thenReturn(metadata); @@ -306,7 +306,7 @@ public class TestJdbcCommon { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - JdbcCommon.convertToAvroStream(rs, baos); + JdbcCommon.convertToAvroStream(rs, baos, true); final byte[] serializedBytes = baos.toByteArray(); @@ -317,7 +317,8 @@ public class TestJdbcCommon { GenericRecord record = null; while (dataFileReader.hasNext()) { record = dataFileReader.next(record); - assertEquals(bigDecimal.toString(), record.get("Chairman").toString()); + assertEquals("_1the__table", record.getSchema().getName()); + assertEquals(bigDecimal.toString(), record.get("The_Chairman").toString()); } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java index 19161ee3bc..499127b8e7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java @@ -83,7 +83,7 @@ public class TestJdbcHugeStream { + " from persons PER, products PRD, relationships REL"); final OutputStream outStream = new FileOutputStream("target/data.avro"); - final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream); + final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream, false); // Deserialize bytes to records final InputStream instream = new FileInputStream("target/data.avro"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java index 79a1c84fc2..2c3eb58aa7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java @@ -91,7 +91,7 @@ public class TestJdbcTypesDerby { final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U"); final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - JdbcCommon.convertToAvroStream(resultSet, outStream); + JdbcCommon.convertToAvroStream(resultSet, outStream, false); final byte[] serializedBytes = outStream.toByteArray(); assertNotNull(serializedBytes); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java index 7d31c21247..c4f6071684 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java @@ -99,7 +99,7 @@ public class TestJdbcTypesH2 { // final ResultSet resultSet = st.executeQuery("select U.somebinary from users U"); final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - JdbcCommon.convertToAvroStream(resultSet, outStream); + JdbcCommon.convertToAvroStream(resultSet, outStream, false); final byte[] serializedBytes = outStream.toByteArray(); assertNotNull(serializedBytes);