From b22500d0a322794aa8a3e95b570bb58e6d62d33e Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Mon, 15 Aug 2016 15:50:47 -0600 Subject: [PATCH] NIFI-2576 Allowing PutSQL to use timestamps in epoch or string format This closes #869. Signed-off-by: Bryan Bende --- .../nifi/processors/standard/PutSQL.java | 21 ++++++++- .../nifi/processors/standard/TestPutSQL.java | 43 +++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 5b1a048163..55d3855e9a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -54,6 +54,8 @@ import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; @@ -154,6 +156,8 @@ public class PutSQL extends AbstractProcessor { private static final String FRAGMENT_INDEX_ATTR = "fragment.index"; private static final String FRAGMENT_COUNT_ATTR = "fragment.count"; + private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$"); + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); @@ -611,6 +615,8 @@ public class PutSQL extends AbstractProcessor { setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); } catch (final NumberFormatException nfe) { throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); + } catch (ParseException pe) { + throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe); } } } @@ -729,7 +735,7 @@ public class PutSQL extends AbstractProcessor { * @param jdbcType the JDBC Type of the SQL parameter to set * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter */ - private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { + private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException, ParseException { if (parameterValue == null) { stmt.setNull(parameterIndex, jdbcType); } else { @@ -768,7 +774,18 @@ public class PutSQL extends AbstractProcessor { stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); break; case Types.TIMESTAMP: - stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); + long lTimestamp=0L; + + if(LONG_PATTERN.matcher(parameterValue).matches()){ + lTimestamp = Long.parseLong(parameterValue); + }else { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS"); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + lTimestamp = parsedDate.getTime(); + } + + stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp)); + break; case Types.CHAR: case Types.VARCHAR: diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index b5ebca52c8..592d329767 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -28,6 +28,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; @@ -261,6 +263,47 @@ public class TestPutSQL { } } + @Test + public void testUsingTimestampValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE TIMESTAMPTESTS (id integer primary key, ts1 timestamp, ts2 timestamp)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String arg2TS = "2001-01-01 23:01:01.001"; + final String art3TS = "2002-02-02 22:02:02.002"; + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS"); + java.util.Date parsedDate = dateFormat.parse(arg2TS); + + final Map attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP)); + attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime())); + attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP)); + attributes.put("sql.args.2.value", art3TS); + + runner.enqueue("INSERT INTO TIMESTAMPTESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTESTS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(arg2TS, rs.getString(2)); + assertEquals(art3TS, rs.getString(3)); + assertFalse(rs.next()); + } + } + } + @Test public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);