NIFI-2262: Added Avro-normalization of table/column names in SQL processors

This closes #994.
This commit is contained in:
Matt Burgess 2016-09-07 13:53:47 -04:00 committed by Pierre Villard
parent 9ff5c0b25f
commit d325773760
8 changed files with 73 additions and 34 deletions

View File

@ -131,6 +131,16 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
.name("dbf-normalize")
.displayName("Normalize Table/Column Names")
.description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+ "will be changed to underscores in order to build a valid Avro record.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
protected List<PropertyDescriptor> propDescriptors;
public static final PropertyDescriptor DB_TYPE;

View File

@ -107,6 +107,16 @@ public class ExecuteSQL extends AbstractProcessor {
.sensitive(false)
.build();
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
.name("dbf-normalize")
.displayName("Normalize Table/Column Names")
.description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+ "will be changed to underscores in order to build a valid Avro record.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
private final List<PropertyDescriptor> propDescriptors;
public ExecuteSQL() {
@ -119,6 +129,7 @@ public class ExecuteSQL extends AbstractProcessor {
pds.add(DBCP_SERVICE);
pds.add(SQL_SELECT_QUERY);
pds.add(QUERY_TIMEOUT);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -160,6 +171,7 @@ public class ExecuteSQL extends AbstractProcessor {
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final StopWatch stopWatch = new StopWatch(true);
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
@ -190,7 +202,7 @@ public class ExecuteSQL extends AbstractProcessor {
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
final ResultSet resultSet = st.executeQuery(selectQuery);
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out));
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, convertNamesForAvro));
} catch (final SQLException e) {
throw new ProcessException(e);
}

View File

@ -134,6 +134,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -178,6 +179,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
@ -248,7 +250,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile));
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile, convertNamesForAvro));
} catch (SQLException | RuntimeException e) {
throw new ProcessException("Error during database query or conversion of records to Avro.", e);
}

View File

@ -70,22 +70,23 @@ public class JdbcCommon {
private static final int MAX_DIGITS_IN_BIGINT = 19;
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, null);
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, null, convertNames);
}
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName)
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, boolean convertNames)
throws SQLException, IOException {
return convertToAvroStream(rs, outStream, recordName, null);
return convertToAvroStream(rs, outStream, recordName, null, convertNames);
}
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) throws IOException, SQLException {
return convertToAvroStream(rs, outStream, recordName, callback, 0);
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, boolean convertNames)
throws IOException, SQLException {
return convertToAvroStream(rs, outStream, recordName, callback, 0, convertNames);
}
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows)
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows, boolean convertNames)
throws SQLException, IOException {
final Schema schema = createSchema(rs, recordName);
final Schema schema = createSchema(rs, recordName, convertNames);
final GenericRecord rec = new GenericData.Record(schema);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@ -160,7 +161,7 @@ public class JdbcCommon {
dataFileWriter.append(rec);
nrOfRows += 1;
if(maxRows > 0 && nrOfRows == maxRows)
if (maxRows > 0 && nrOfRows == maxRows)
break;
}
@ -169,7 +170,7 @@ public class JdbcCommon {
}
public static Schema createSchema(final ResultSet rs) throws SQLException {
return createSchema(rs, null);
return createSchema(rs, null, false);
}
/**
@ -181,7 +182,7 @@ public class JdbcCommon {
* @return A Schema object representing the result set converted to an Avro record
* @throws SQLException if any error occurs during conversion
*/
public static Schema createSchema(final ResultSet rs, String recordName) throws SQLException {
public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
String tableName = StringUtils.isEmpty(recordName) ? "NiFi_ExecuteSQL_Record" : recordName;
@ -192,12 +193,17 @@ public class JdbcCommon {
}
}
if (convertNames) {
tableName = normalizeNameForAvro(tableName);
}
final FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
/**
* Some missing Avro types - Decimal, Date types. May need some additional work.
*/
for (int i = 1; i <= nrOfColumns; i++) {
String columnName = convertNames ? normalizeNameForAvro(meta.getColumnName(i)) : meta.getColumnName(i);
switch (meta.getColumnType(i)) {
case CHAR:
case LONGNVARCHAR:
@ -205,25 +211,25 @@ public class JdbcCommon {
case NCHAR:
case NVARCHAR:
case VARCHAR:
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
case BIT:
case BOOLEAN:
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
break;
case INTEGER:
if (meta.isSigned(i)) {
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
} else {
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
}
break;
case SMALLINT:
case TINYINT:
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
break;
case BIGINT:
@ -232,38 +238,38 @@ public class JdbcCommon {
// to strings as necessary
int precision = meta.getPrecision(i);
if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) {
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
} else {
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
}
break;
// java.sql.RowId is interface, is seems to be database
// implementation specific, let's convert to String
case ROWID:
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
case FLOAT:
case REAL:
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
break;
case DOUBLE:
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
break;
// Did not find direct suitable type, need to be clarified!!!!
case DECIMAL:
case NUMERIC:
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
// Did not find direct suitable type, need to be clarified!!!!
case DATE:
case TIME:
case TIMESTAMP:
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
case BINARY:
@ -272,7 +278,7 @@ public class JdbcCommon {
case ARRAY:
case BLOB:
case CLOB:
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
break;
@ -284,6 +290,14 @@ public class JdbcCommon {
return builder.endRecord();
}
public static String normalizeNameForAvro(String inputName) {
String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_");
if (Character.isDigit(normalizedName.charAt(0))) {
normalizedName = "_" + normalizedName;
}
return normalizedName;
}
/**
* An interface for callback methods which allows processing of a row during the convertToAvroStream() processing.
* <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference.

View File

@ -153,7 +153,7 @@ public class TestJdbcCommon {
final ResultSet resultSet = st.executeQuery("select R.*, ROW_NUMBER() OVER () as rownr from restaurants R");
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(resultSet, outStream);
JdbcCommon.convertToAvroStream(resultSet, outStream, false);
final byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);
@ -287,8 +287,8 @@ public class TestJdbcCommon {
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC);
when(metadata.getColumnName(1)).thenReturn("Chairman");
when(metadata.getTableName(1)).thenReturn("table");
when(metadata.getColumnName(1)).thenReturn("The.Chairman");
when(metadata.getTableName(1)).thenReturn("1the::table");
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
@ -306,7 +306,7 @@ public class TestJdbcCommon {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(rs, baos);
JdbcCommon.convertToAvroStream(rs, baos, true);
final byte[] serializedBytes = baos.toByteArray();
@ -317,7 +317,8 @@ public class TestJdbcCommon {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);
assertEquals(bigDecimal.toString(), record.get("Chairman").toString());
assertEquals("_1the__table", record.getSchema().getName());
assertEquals(bigDecimal.toString(), record.get("The_Chairman").toString());
}
}
}

View File

@ -83,7 +83,7 @@ public class TestJdbcHugeStream {
+ " from persons PER, products PRD, relationships REL");
final OutputStream outStream = new FileOutputStream("target/data.avro");
final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream);
final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream, false);
// Deserialize bytes to records
final InputStream instream = new FileInputStream("target/data.avro");

View File

@ -91,7 +91,7 @@ public class TestJdbcTypesDerby {
final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(resultSet, outStream);
JdbcCommon.convertToAvroStream(resultSet, outStream, false);
final byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);

View File

@ -99,7 +99,7 @@ public class TestJdbcTypesH2 {
// final ResultSet resultSet = st.executeQuery("select U.somebinary from users U");
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(resultSet, outStream);
JdbcCommon.convertToAvroStream(resultSet, outStream, false);
final byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);