NIFI-2829 Date and Time Format Support for PutSQL

This closes #1524.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
patricker 2017-02-21 13:22:04 -07:00 committed by Koji Kawamura
parent 3da8b94ddd
commit 03bff7c9fc
2 changed files with 208 additions and 7 deletions

View File

@ -72,6 +72,9 @@ import java.sql.Types;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalAccessor;
import java.util.ArrayList; import java.util.ArrayList;
@ -110,11 +113,13 @@ import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnEr
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
@ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
+ "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases " + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases "
+ "a format option needs to be specified, currently this is only applicable for binary data types and timestamps. For binary data types " + "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - "
+ "available options are 'ascii', 'base64' and 'hex'. In 'ascii' format each string character in your attribute value represents a single byte, this is the default format " + "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. "
+ "and the format provided by Avro Processors. In 'base64' format your string is a Base64 encoded string. In 'hex' format the string is hex encoded with all " + "base64: the string is a Base64 encoded string that can be decoded to bytes. "
+ "letters in upper case and no '0x' at the beginning. For timestamps, the format can be specified according to java.time.format.DateTimeFormatter." + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. "
+ "Customer and named patterns are accepted i.e. ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME')") + "Dates/Times/Timestamps - "
+ "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+ "as specified according to java.time.format.DateTimeFormatter.")
}) })
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
@ -828,10 +833,50 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue)); stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue));
break; break;
case Types.DATE: case Types.DATE:
stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); Date date;
if (valueFormat.equals("")) {
if(LONG_PATTERN.matcher(parameterValue).matches()){
date = new Date(Long.parseLong(parameterValue));
}else {
String dateFormatString = "yyyy-MM-dd";
if (!valueFormat.isEmpty()) {
dateFormatString = valueFormat;
}
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue);
date = new Date(parsedDate.getTime());
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
LocalDate parsedDate = LocalDate.parse(parameterValue, dtFormatter);
date = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
}
stmt.setDate(parameterIndex, date);
break; break;
case Types.TIME: case Types.TIME:
stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); Time time;
if (valueFormat.equals("")) {
if (LONG_PATTERN.matcher(parameterValue).matches()) {
time = new Time(Long.parseLong(parameterValue));
} else {
String timeFormatString = "HH:mm:ss.SSS";
if (!valueFormat.isEmpty()) {
timeFormatString = valueFormat;
}
SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue);
time = new Time(parsedDate.getTime());
}
} else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter);
time = Time.valueOf(parsedTime);
}
stmt.setTime(parameterIndex, time);
break; break;
case Types.TIMESTAMP: case Types.TIMESTAMP:
long lTimestamp=0L; long lTimestamp=0L;

View File

@ -451,6 +451,77 @@ public class TestPutSQL {
} }
} }
@Test
public void testUsingDateTimeValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST3 (id integer primary key, ts1 TIME, ts2 DATE)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String dateStr = "2002-02-02";
final String timeStr = "12:02:02";
Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", timeStr);
attributes.put("sql.args.1.format", "ISO_LOCAL_TIME");
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", dateStr);
attributes.put("sql.args.2.format", "ISO_LOCAL_DATE");
runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", "68522000");
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", "1012633200000");
runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, ?, ?)".getBytes(), attributes);
attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", "120202000");
attributes.put("sql.args.1.format", "HHmmssSSS");
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", "20020202");
attributes.put("sql.args.2.format", "yyyyMMdd");
runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (3, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 3);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST3 ORDER BY ID");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(68522000L, rs.getTime(2).getTime());
assertEquals(1012633200000L, rs.getDate(3).getTime());
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals(68522000L, rs.getTime(2).getTime());
assertEquals(1012633200000L, rs.getDate(3).getTime());
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertEquals(68522000L, rs.getTime(2).getTime());
assertEquals(1012633200000L, rs.getDate(3).getTime());
assertFalse(rs.next());
}
}
}
@Test @Test
public void testBitType() throws SQLException, InitializationException { public void testBitType() throws SQLException, InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
@ -568,6 +639,91 @@ public class TestPutSQL {
} }
@Test
public void testUsingTimeValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMETESTS (id integer primary key, ts1 time, ts2 time)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String arg2TS = "00:01:01";
final String art3TS = "12:02:02";
final String timeFormatString = "HH:mm:ss";
SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
java.util.Date parsedDate = dateFormat.parse(arg2TS);
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.TIME));
attributes.put("sql.args.2.value", art3TS);
attributes.put("sql.args.2.format", timeFormatString);
runner.enqueue("INSERT INTO TIMETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMETESTS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(arg2TS, rs.getString(2));
assertEquals(art3TS, rs.getString(3));
assertFalse(rs.next());
}
}
}
@Test
public void testUsingDateValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE DATETESTS (id integer primary key, ts1 date, ts2 date)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String arg2TS = "2001-01-01";
final String art3TS = "2002-02-02";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
java.util.Date parsedDate = dateFormat.parse(arg2TS);
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.DATE));
attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", art3TS);
runner.enqueue("INSERT INTO DATETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM DATETESTS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(arg2TS, rs.getString(2));
assertEquals(art3TS, rs.getString(3));
assertFalse(rs.next());
}
}
}
@Test @Test
public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException { public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);