mirror of https://github.com/apache/nifi.git
NIFI-2518: Added unit test showing issue
NIFI-2518: Added support for fractional seconds to AbstractDatabaseFetchProcessor This closes #821
This commit is contained in:
parent
219234d001
commit
557d6365bf
|
@ -35,6 +35,7 @@ import java.sql.Statement;
|
|||
import java.sql.Timestamp;
|
||||
import java.text.DecimalFormat;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -135,6 +136,8 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
|
|||
protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();
|
||||
protected final Map<String, Integer> columnTypeMap = new HashMap<>();
|
||||
|
||||
private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS");
|
||||
|
||||
static {
|
||||
// Load the DatabaseAdapters
|
||||
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
|
||||
|
@ -275,14 +278,19 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
|
|||
break;
|
||||
|
||||
case TIME:
|
||||
Date rawColTimeValue = resultSet.getDate(columnIndex);
|
||||
java.sql.Time colTimeValue = new java.sql.Time(rawColTimeValue.getTime());
|
||||
java.sql.Time maxTimeValue = null;
|
||||
// Compare milliseconds-since-epoch. Need getTimestamp() instead of getTime() since some databases
|
||||
// don't return milliseconds in the Time returned by getTime().
|
||||
Date colTimeValue = new Date(resultSet.getTimestamp(columnIndex).getTime());
|
||||
Date maxTimeValue = null;
|
||||
if (maxValueString != null) {
|
||||
maxTimeValue = java.sql.Time.valueOf(maxValueString);
|
||||
try {
|
||||
maxTimeValue = TIME_TYPE_FORMAT.parse(maxValueString);
|
||||
} catch (ParseException pe) {
|
||||
// Shouldn't happen, but just in case, leave the value as null so the new value will be stored
|
||||
}
|
||||
}
|
||||
if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) {
|
||||
return colTimeValue.toString();
|
||||
return TIME_TYPE_FORMAT.format(colTimeValue);
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -299,8 +307,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
|
|||
return oracleTimestampValue.toString();
|
||||
}
|
||||
} else {
|
||||
Timestamp rawColTimestampValue = resultSet.getTimestamp(columnIndex);
|
||||
java.sql.Timestamp colTimestampValue = new java.sql.Timestamp(rawColTimestampValue.getTime());
|
||||
Timestamp colTimestampValue = resultSet.getTimestamp(columnIndex);
|
||||
java.sql.Timestamp maxTimestampValue = null;
|
||||
if (maxValueString != null) {
|
||||
maxTimestampValue = java.sql.Timestamp.valueOf(maxValueString);
|
||||
|
|
|
@ -278,8 +278,58 @@ public class QueryDatabaseTableTest {
|
|||
runner.clearTransferState();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTimestampNanos() throws ClassNotFoundException, SQLException, InitializationException, IOException {
|
||||
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
Statement stmt = con.createStatement();
|
||||
|
||||
try {
|
||||
stmt.execute("drop table TEST_QUERY_DB_TABLE");
|
||||
} catch (final SQLException sqle) {
|
||||
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
|
||||
}
|
||||
|
||||
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
|
||||
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.000123456')");
|
||||
|
||||
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
|
||||
runner.setIncomingConnection(false);
|
||||
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
|
||||
InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
|
||||
assertEquals(1, getNumberOfRecordsFromStream(in));
|
||||
runner.clearTransferState();
|
||||
|
||||
// Run again, this time no flowfiles/rows should be transferred
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Add a new row with a lower timestamp (but same millisecond value), no flow file should be transferred
|
||||
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.000')");
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Add a new row with a higher timestamp, one flow file should be transferred
|
||||
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.0003')");
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
|
||||
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
|
||||
assertEquals(1, getNumberOfRecordsFromStream(in));
|
||||
runner.clearTransferState();
|
||||
|
||||
// Run again, this time no flowfiles/rows should be transferred
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
|
||||
runner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithNullIntColumn() throws SQLException {
|
||||
// load test data to database
|
||||
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||
|
|
Loading…
Reference in New Issue