From 557d6365bf27ee2271cd9d3a147361acb6cc9c6c Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 9 Aug 2016 10:18:26 -0400 Subject: [PATCH] NIFI-2518: Added unit test showing issue NIFI-2518: Added support for fractional seconds to AbstractDatabaseFetchProcessor This closes #821 --- .../AbstractDatabaseFetchProcessor.java | 21 +++++--- .../standard/QueryDatabaseTableTest.java | 52 ++++++++++++++++++- 2 files changed, 65 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 6182d93482..7b304795f0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -35,6 +35,7 @@ import java.sql.Statement; import java.sql.Timestamp; import java.text.DecimalFormat; import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -135,6 +136,8 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact protected final static Map dbAdapters = new HashMap<>(); protected final Map columnTypeMap = new HashMap<>(); + private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); + static { // Load the DatabaseAdapters ServiceLoader dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class); @@ -275,14 +278,19 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact break; case TIME: - Date rawColTimeValue = resultSet.getDate(columnIndex); - java.sql.Time colTimeValue = new java.sql.Time(rawColTimeValue.getTime()); - java.sql.Time maxTimeValue = null; + // Compare milliseconds-since-epoch. Need getTimestamp() instead of getTime() since some databases + // don't return milliseconds in the Time returned by getTime(). + Date colTimeValue = new Date(resultSet.getTimestamp(columnIndex).getTime()); + Date maxTimeValue = null; if (maxValueString != null) { - maxTimeValue = java.sql.Time.valueOf(maxValueString); + try { + maxTimeValue = TIME_TYPE_FORMAT.parse(maxValueString); + } catch (ParseException pe) { + // Shouldn't happen, but just in case, leave the value as null so the new value will be stored + } } if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) { - return colTimeValue.toString(); + return TIME_TYPE_FORMAT.format(colTimeValue); } break; @@ -299,8 +307,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact return oracleTimestampValue.toString(); } } else { - Timestamp rawColTimestampValue = resultSet.getTimestamp(columnIndex); - java.sql.Timestamp colTimestampValue = new java.sql.Timestamp(rawColTimestampValue.getTime()); + Timestamp colTimestampValue = resultSet.getTimestamp(columnIndex); java.sql.Timestamp maxTimestampValue = null; if (maxValueString != null) { maxTimestampValue = java.sql.Timestamp.valueOf(maxValueString); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 5aec89921f..a373f8fe59 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -278,8 +278,58 @@ public class QueryDatabaseTableTest { runner.clearTransferState(); } - @Test + public void testTimestampNanos() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.000123456')"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add a new row with a lower timestamp (but same millisecond value), no flow file should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.000')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add a new row with a higher timestamp, one flow file should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.0003')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + } + + @Test public void testWithNullIntColumn() throws SQLException { // load test data to database final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();