NIFI-2576 Allowing PutSQL to use timestamps in epoch or string format

This closes #869.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Peter Wicks 2016-08-15 15:50:47 -06:00 committed by Bryan Bende
parent 03d3b3961d
commit b22500d0a3
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 62 additions and 2 deletions

View File

@ -54,6 +54,8 @@ import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
@ -154,6 +156,8 @@ public class PutSQL extends AbstractProcessor {
private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$");
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
@ -611,6 +615,8 @@ public class PutSQL extends AbstractProcessor {
setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
} catch (final NumberFormatException nfe) {
throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe);
} catch (ParseException pe) {
throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe);
}
}
}
@ -729,7 +735,7 @@ public class PutSQL extends AbstractProcessor {
* @param jdbcType the JDBC Type of the SQL parameter to set
* @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter
*/
private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException {
private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException, ParseException {
if (parameterValue == null) {
stmt.setNull(parameterIndex, jdbcType);
} else {
@ -768,7 +774,18 @@ public class PutSQL extends AbstractProcessor {
stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
break;
case Types.TIMESTAMP:
stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
long lTimestamp=0L;
if(LONG_PATTERN.matcher(parameterValue).matches()){
lTimestamp = Long.parseLong(parameterValue);
}else {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");
java.util.Date parsedDate = dateFormat.parse(parameterValue);
lTimestamp = parsedDate.getTime();
}
stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp));
break;
case Types.CHAR:
case Types.VARCHAR:

View File

@ -28,6 +28,8 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
@ -261,6 +263,47 @@ public class TestPutSQL {
}
}
@Test
public void testUsingTimestampValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMESTAMPTESTS (id integer primary key, ts1 timestamp, ts2 timestamp)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String arg2TS = "2001-01-01 23:01:01.001";
final String art3TS = "2002-02-02 22:02:02.002";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");
java.util.Date parsedDate = dateFormat.parse(arg2TS);
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP));
attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP));
attributes.put("sql.args.2.value", art3TS);
runner.enqueue("INSERT INTO TIMESTAMPTESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTESTS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(arg2TS, rs.getString(2));
assertEquals(art3TS, rs.getString(3));
assertFalse(rs.next());
}
}
}
@Test
public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);