NIFI-2829 - Add Date and Time Format Support for PutSQL

Fix unit test for Date and Time type time zone problem
Enhance Time type to record milliseconds

This closes #1983.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
deonhuang 2017-07-03 14:00:22 +08:00 committed by Koji Kawamura
parent 03bff7c9fc
commit a6e94de0bb
2 changed files with 59 additions and 19 deletions

View File

@ -73,6 +73,7 @@ import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
@ -119,7 +120,10 @@ import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnEr
+ "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. " + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. "
+ "Dates/Times/Timestamps - " + "Dates/Times/Timestamps - "
+ "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') " + "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+ "as specified according to java.time.format.DateTimeFormatter.") + "as specified according to java.time.format.DateTimeFormatter. "
+ "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in "
+ "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), "
+ "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
}) })
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
@ -840,9 +844,6 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
date = new Date(Long.parseLong(parameterValue)); date = new Date(Long.parseLong(parameterValue));
}else { }else {
String dateFormatString = "yyyy-MM-dd"; String dateFormatString = "yyyy-MM-dd";
if (!valueFormat.isEmpty()) {
dateFormatString = valueFormat;
}
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString); SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue); java.util.Date parsedDate = dateFormat.parse(parameterValue);
date = new Date(parsedDate.getTime()); date = new Date(parsedDate.getTime());
@ -863,9 +864,6 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
time = new Time(Long.parseLong(parameterValue)); time = new Time(Long.parseLong(parameterValue));
} else { } else {
String timeFormatString = "HH:mm:ss.SSS"; String timeFormatString = "HH:mm:ss.SSS";
if (!valueFormat.isEmpty()) {
timeFormatString = valueFormat;
}
SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString); SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
java.util.Date parsedDate = dateFormat.parse(parameterValue); java.util.Date parsedDate = dateFormat.parse(parameterValue);
time = new Time(parsedDate.getTime()); time = new Time(parsedDate.getTime());
@ -873,7 +871,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
} else { } else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter); LocalTime parsedTime = LocalTime.parse(parameterValue, dtFormatter);
time = Time.valueOf(parsedTime); LocalDateTime localDateTime = parsedTime.atDate(LocalDate.ofEpochDay(0));
Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
time = new Time(instant.toEpochMilli());
} }
stmt.setTime(parameterIndex, time); stmt.setTime(parameterIndex, time);
@ -890,7 +890,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
java.util.Date parsedDate = dateFormat.parse(parameterValue); java.util.Date parsedDate = dateFormat.parse(parameterValue);
lTimestamp = parsedDate.getTime(); lTimestamp = parsedDate.getTime();
} }
}else { } else {
final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat); final DateTimeFormatter dtFormatter = getDateTimeFormatter(valueFormat);
TemporalAccessor accessor = dtFormatter.parse(parameterValue); TemporalAccessor accessor = dtFormatter.parse(parameterValue);
java.util.Date parsedDate = java.util.Date.from(Instant.from(accessor)); java.util.Date parsedDate = java.util.Date.from(Instant.from(accessor));

View File

@ -29,12 +29,20 @@ import java.sql.DriverManager;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Time;
import java.sql.Types; import java.sql.Types;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays; import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.TimeZone;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
@ -467,6 +475,34 @@ public class TestPutSQL {
final String dateStr = "2002-02-02"; final String dateStr = "2002-02-02";
final String timeStr = "12:02:02"; final String timeStr = "12:02:02";
final String timeFormatString = "HH:mm:ss";
final String dateFormatString ="yyyy-MM-dd";
final DateTimeFormatter timeFormatter= DateTimeFormatter.ISO_LOCAL_TIME;
LocalTime parsedTime = LocalTime.parse(timeStr, timeFormatter);
Time expectedTime = Time.valueOf(parsedTime);
final DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_LOCAL_DATE;
LocalDate parsedDate = LocalDate.parse(dateStr, dateFormatter);
Date expectedDate = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
final long expectedTimeInLong = expectedTime.getTime();
final long expectedDateInLong = expectedDate.getTime();
//test with time zone GMT to avoid negative value unmatched with long pattern problem.
SimpleDateFormat timeFormat = new SimpleDateFormat(timeFormatString);
timeFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
java.util.Date parsedTimeGMT = timeFormat.parse(timeStr);
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
java.util.Date parsedDateGMT = dateFormat.parse(dateStr);
Calendar gmtCalendar = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
//test with ISO LOCAL format attribute
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", timeStr); attributes.put("sql.args.1.value", timeStr);
@ -477,14 +513,16 @@ public class TestPutSQL {
runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
//test Long pattern without format attribute
attributes = new HashMap<>(); attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", "68522000"); attributes.put("sql.args.1.value", Long.toString(parsedTimeGMT.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.DATE)); attributes.put("sql.args.2.type", String.valueOf(Types.DATE));
attributes.put("sql.args.2.value", "1012633200000"); attributes.put("sql.args.2.value", Long.toString(parsedDateGMT.getTime()));
runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, ?, ?)".getBytes(), attributes); runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, ?, ?)".getBytes(), attributes);
//test with format attribute
attributes = new HashMap<>(); attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", "120202000"); attributes.put("sql.args.1.value", "120202000");
@ -504,18 +542,18 @@ public class TestPutSQL {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST3 ORDER BY ID"); final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST3 ORDER BY ID");
assertTrue(rs.next()); assertTrue(rs.next());
assertEquals(1, rs.getInt(1)); assertEquals(1, rs.getInt(1));
assertEquals(68522000L, rs.getTime(2).getTime()); assertEquals(expectedTimeInLong, rs.getTime(2).getTime());
assertEquals(1012633200000L, rs.getDate(3).getTime()); assertEquals(expectedDateInLong, rs.getDate(3).getTime());
assertTrue(rs.next()); assertTrue(rs.next());
assertEquals(2, rs.getInt(1)); assertEquals(2, rs.getInt(1));
assertEquals(68522000L, rs.getTime(2).getTime()); assertEquals(parsedTimeGMT.getTime(), rs.getTime(2).getTime());
assertEquals(1012633200000L, rs.getDate(3).getTime()); assertEquals(parsedDateGMT.getTime(), rs.getDate(3,gmtCalendar).getTime());
assertTrue(rs.next()); assertTrue(rs.next());
assertEquals(3, rs.getInt(1)); assertEquals(3, rs.getInt(1));
assertEquals(68522000L, rs.getTime(2).getTime()); assertEquals(expectedTimeInLong, rs.getTime(2).getTime());
assertEquals(1012633200000L, rs.getDate(3).getTime()); assertEquals(expectedDateInLong, rs.getDate(3).getTime());
assertFalse(rs.next()); assertFalse(rs.next());
} }
@ -656,12 +694,13 @@ public class TestPutSQL {
final String art3TS = "12:02:02"; final String art3TS = "12:02:02";
final String timeFormatString = "HH:mm:ss"; final String timeFormatString = "HH:mm:ss";
SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString); SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormatString);
dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
java.util.Date parsedDate = dateFormat.parse(arg2TS); java.util.Date parsedDate = dateFormat.parse(arg2TS);
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.TIME)); attributes.put("sql.args.1.type", String.valueOf(Types.TIME));
attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime())); attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
attributes.put("sql.args.2.type", String.valueOf(Types.TIME)); attributes.put("sql.args.2.type", String.valueOf(Types.TIME));
attributes.put("sql.args.2.value", art3TS); attributes.put("sql.args.2.value", art3TS);
attributes.put("sql.args.2.format", timeFormatString); attributes.put("sql.args.2.format", timeFormatString);
@ -674,9 +713,10 @@ public class TestPutSQL {
try (final Connection conn = service.getConnection()) { try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) { try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMETESTS"); final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMETESTS");
assertTrue(rs.next()); assertTrue(rs.next());
assertEquals(1, rs.getInt(1)); assertEquals(1, rs.getInt(1));
assertEquals(arg2TS, rs.getString(2)); assertEquals(arg2TS, dateFormat.format(rs.getTime(2)));
assertEquals(art3TS, rs.getString(3)); assertEquals(art3TS, rs.getString(3));
assertFalse(rs.next()); assertFalse(rs.next());
} }