NIFI-8749 Removed implicit time zone conversion to GMT

- Updated DataTypeUtils.getDateFormat() to use system default time zone
- Updated Record Path Guide to match Expression Language Guide regarding default time zone
- Updated impacted unit tests to expect localized dates instead of dates converted to GMT

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5305
This commit is contained in:
exceptionfactory 2021-08-11 13:48:13 -05:00 committed by Matthew Burgess
parent 2ab23efb74
commit 758e1850ed
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
11 changed files with 149 additions and 159 deletions

View File

@ -35,6 +35,10 @@ import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.text.DateFormat;
import java.text.ParseException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
@ -1417,7 +1421,7 @@ public class TestRecordPath {
}
@Test
public void testFormatDateFromString() throws ParseException {
public void testFormatDateFromString() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
@ -1426,70 +1430,84 @@ public class TestRecordPath {
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("date", "2017-10-20T11:00:00Z");
final String localDateFormatted = "2017-10-20";
final String localDateTimeFormatted = String.format("%sT12:45:30", localDateFormatted);
final LocalDateTime localDateTime = LocalDateTime.parse(localDateTimeFormatted);
final int offsetHours = 6;
final String timeZone = String.format("GMT+%d:00", offsetHours);
values.put("date", localDateTimeFormatted);
final Record record = new MapRecord(schema, values);
final FieldValue fieldValue = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get();
assertEquals("2017-10-20", fieldValue.getValue());
final FieldValue fieldValue2 = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'yyyy-MM-dd' , 'GMT+8:00')")
final FieldValue fieldValue = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss\"), 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get();
assertEquals(localDateFormatted, fieldValue.getValue());
final FieldValue fieldValue2 = RecordPath.compile(String.format("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss\"), 'yyyy-MM-dd' , '%s')", timeZone))
.evaluate(record).getSelectedFields().findFirst().get();
assertEquals("2017-10-20", fieldValue2.getValue());
assertEquals(localDateFormatted, fieldValue2.getValue());
final FieldValue fieldValue3 = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'yyyy-MM-dd HH:mm', 'GMT+8:00')")
final FieldValue fieldValue3 = RecordPath.compile(String.format("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss\"), \"yyyy-MM-dd'T'HH:mm:ss\", '%s')", timeZone))
.evaluate(record).getSelectedFields().findFirst().get();
assertEquals("2017-10-20 19:00", fieldValue3.getValue());
final FieldValue fieldValueUnchanged = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
assertEquals(DataTypeUtils.getDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").parse("2017-10-20T11:00:00Z"), fieldValueUnchanged.getValue());
final FieldValue fieldValueUnchanged2 = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'INVALID' , 'INVALID')")
final ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, ZoneOffset.systemDefault());
final ZonedDateTime adjustedZoneDateTime = zonedDateTime.withZoneSameInstant(ZoneOffset.ofHours(offsetHours));
final LocalDateTime adjustedLocalDateTime = adjustedZoneDateTime.toLocalDateTime();
final String adjustedDateTime = adjustedLocalDateTime.toString();
assertEquals(adjustedDateTime, fieldValue3.getValue());
final FieldValue fieldValueUnchanged = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss\"), 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
assertEquals(localDateFormatted, fieldValueUnchanged.getValue().toString());
final FieldValue fieldValueUnchanged2 = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss\"), 'INVALID' , 'INVALID')")
.evaluate(record).getSelectedFields().findFirst().get();
assertEquals(DataTypeUtils.getDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").parse("2017-10-20T11:00:00Z"), fieldValueUnchanged2.getValue());
assertEquals(localDateFormatted, fieldValueUnchanged2.getValue().toString());
}
@Test
public void testFormatDateFromLong() throws ParseException {
public void testFormatDateFromLong() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("date", RecordFieldType.LONG.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final DateFormat dateFormat = DataTypeUtils.getDateFormat("yyyy-MM-dd");
final long dateValue = dateFormat.parse("2017-10-20").getTime();
final String localDate = "2017-10-20";
final String instantFormatted = String.format("%sT12:30:45Z", localDate);
final long epochMillis = Instant.parse(instantFormatted).toEpochMilli();
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("date", dateValue);
values.put("date", epochMillis);
final Record record = new MapRecord(schema, values);
assertEquals("2017-10-20", RecordPath.compile("format(/date, 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("2017-10-20 08:00:00", RecordPath.compile("format(/date, 'yyyy-MM-dd HH:mm:ss', 'GMT+8:00' )").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(localDate, RecordPath.compile("format(/date, 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(instantFormatted, RecordPath.compile("format(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\", 'GMT')").evaluate(record).getSelectedFields().findFirst().get().getValue());
final FieldValue fieldValueUnchanged = RecordPath.compile("format(/date, 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
assertEquals(dateValue, fieldValueUnchanged.getValue());
assertEquals(epochMillis, fieldValueUnchanged.getValue());
final FieldValue fieldValueUnchanged2 = RecordPath.compile("format(/date, 'INVALID', 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
assertEquals(dateValue, fieldValueUnchanged2.getValue());
assertEquals(epochMillis, fieldValueUnchanged2.getValue());
}
@Test
public void testFormatDateFromDate() throws ParseException {
public void testFormatDateFromDate() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final DateFormat dateFormat = DataTypeUtils.getDateFormat("yyyy-MM-dd");
final java.util.Date utilDate = dateFormat.parse("2017-10-20");
final Date dateValue = new Date(utilDate.getTime());
final String localDate = "2017-10-20";
final String instantFormatted = String.format("%sT12:30:45Z", localDate);
final Instant instant = Instant.parse(instantFormatted);
final Date dateValue = new Date(instant.toEpochMilli());
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("date", dateValue);
final Record record = new MapRecord(schema, values);
assertEquals("2017-10-20", RecordPath.compile("format(/date, 'yyyy-MM-dd')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("2017-10-20 08:00:00", RecordPath.compile("format(/date, 'yyyy-MM-dd HH:mm:ss', 'GMT+8:00')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(localDate, RecordPath.compile("format(/date, 'yyyy-MM-dd')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(instantFormatted, RecordPath.compile("format(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\", 'GMT')").evaluate(record).getSelectedFields().findFirst().get().getValue());
final FieldValue fieldValueUnchanged = RecordPath.compile("format(/date, 'INVALID')").evaluate(record).getSelectedFields().findFirst().get();
assertEquals(dateValue, fieldValueUnchanged.getValue());
@ -1922,8 +1940,7 @@ public class TestRecordPath {
accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
return accountSchema;
return new SimpleRecordSchema(accountFields);
}
private Record createSimpleRecord() {
@ -1937,8 +1954,7 @@ public class TestRecordPath {
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
return record;
return new MapRecord(schema, values);
}
}

View File

@ -112,8 +112,6 @@ public class DataTypeUtils {
private static final Pattern FLOATING_POINT_PATTERN = Pattern.compile(doubleRegex);
private static final Pattern DECIMAL_PATTERN = Pattern.compile(decimalRegex);
private static final TimeZone GMT_TIME_ZONE = TimeZone.getTimeZone("GMT");
private static final Supplier<DateFormat> DEFAULT_DATE_FORMAT = () -> getDateFormat(RecordFieldType.DATE.getDefaultFormat());
private static final Supplier<DateFormat> DEFAULT_TIME_FORMAT = () -> getDateFormat(RecordFieldType.TIME.getDefaultFormat());
private static final Supplier<DateFormat> DEFAULT_TIMESTAMP_FORMAT = () -> getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat());
@ -1329,9 +1327,7 @@ public class DataTypeUtils {
}
/**
* Get Date Format using GMT Time Zone
*
* This Date Format can produce unexpected results when the system default Time Zone is not GMT
* Get Date Format using default Local Time Zone
*
* @param pattern Date Format Pattern used for new SimpleDateFormat()
* @return Date Format or null when pattern not provided
@ -1340,7 +1336,7 @@ public class DataTypeUtils {
if (pattern == null) {
return null;
}
return getDateFormat(pattern, GMT_TIME_ZONE);
return getDateFormat(pattern, TimeZone.getDefault());
}
/**

View File

@ -636,7 +636,8 @@ not the first value that is encountered in the Record itself.
=== format
Converts a Date to a String in the given format with the given time zone(optional, default time zone is GMT).
Converts a Date to a String in the given format with an optional time zone. The function defaults to the system local
time zone when the second argument is not provided.
The first argument to this function must be a Date or a Number, and the second argument must be a format String that
follows the Java SimpleDateFormat, and the third argument, optional, must be a format String that

View File

@ -48,7 +48,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -58,16 +57,13 @@ import static org.junit.Assert.assertTrue;
public class TestListS3 {
private TestRunner runner = null;
private ListS3 mockListS3 = null;
private AmazonS3Client actualS3Client = null;
private AmazonS3Client mockS3Client = null;
@Before
public void setUp() {
mockS3Client = Mockito.mock(AmazonS3Client.class);
mockListS3 = new ListS3() {
final ListS3 mockListS3 = new ListS3() {
protected AmazonS3Client getClient() {
actualS3Client = client;
return mockS3Client;
}
};
@ -161,7 +157,6 @@ public class TestListS3 {
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
final String lastModifiedString = dateFormat.format(lastModified);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
@ -370,7 +365,7 @@ public class TestListS3 {
runner.setProperty(ListS3.BUCKET, "test-bucket");
Calendar calendar = Calendar.getInstance();
calendar.set(2017, 5, 2);
calendar.set(2017, Calendar.JUNE, 2);
Date objectLastModified = calendar.getTime();
long stateCurrentTimestamp = objectLastModified.getTime();

View File

@ -52,6 +52,7 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -65,6 +66,17 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPutElasticsearchHttpRecord {
private static final int DATE_YEAR = 2018;
private static final int DATE_MONTH = 12;
private static final int DATE_DAY = 20;
private static final int TIME_HOUR = 12;
private static final int TIME_MINUTE = 55;
private static final String ISO_DATE = String.format("%d-%d-%d", DATE_YEAR, DATE_MONTH, DATE_DAY);
private static final String EXPECTED_DATE = String.format("%d/%d/%d", DATE_DAY, DATE_MONTH, DATE_YEAR);
private static final LocalDateTime LOCAL_DATE_TIME = LocalDateTime.of(DATE_YEAR, DATE_MONTH, DATE_DAY, TIME_HOUR, TIME_MINUTE);
private static final LocalTime LOCAL_TIME = LocalTime.of(TIME_HOUR, TIME_MINUTE);
private TestRunner runner;
@After
@ -76,35 +88,35 @@ public class TestPutElasticsearchHttpRecord {
public void testPutElasticSearchOnTriggerIndex() throws IOException {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("h:m a");
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("d/M/YYYY h:m a");
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("d/M/yyyy h:m a");
processor.setRecordChecks(record -> {
assertEquals(1, record.get("id"));
assertEquals("reç1", record.get("name"));
assertEquals(101, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(2, record.get("id"));
assertEquals("reç2", record.get("name"));
assertEquals(102, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(3, record.get("id"));
assertEquals("reç3", record.get("name"));
assertEquals(103, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(4, record.get("id"));
assertEquals("reç4", record.get("name"));
assertEquals(104, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
});
runner = TestRunners.newTestRunner(processor); // no failures
generateTestData();
@ -117,9 +129,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
@ -143,9 +153,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "create");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
@ -163,35 +171,35 @@ public class TestPutElasticsearchHttpRecord {
public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("h:m a");
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("d/M/YYYY h:m a");
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("d/M/yyyy h:m a");
processor.setRecordChecks(record -> {
assertEquals(1, record.get("id"));
assertEquals("reç1", record.get("name"));
assertEquals(101, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(2, record.get("id"));
assertEquals("reç2", record.get("name"));
assertEquals(102, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(3, record.get("id"));
assertEquals("reç3", record.get("name"));
assertEquals(103, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
}, record -> {
assertEquals(4, record.get("id"));
assertEquals("reç4", record.get("name"));
assertEquals(104, record.get("code"));
assertEquals("20/12/2018", record.get("date"));
assertEquals(LocalTime.of(18, 55).format(timeFormatter), record.get("time"));
assertEquals(LocalDateTime.of(2018, 12, 20, 18, 55).format(dateTimeFormatter), record.get("ts"));
assertEquals(EXPECTED_DATE, record.get("date"));
assertEquals(LOCAL_TIME.format(timeFormatter), record.get("time"));
assertEquals(LOCAL_DATE_TIME.format(dateTimeFormatter), record.get("ts"));
});
runner = TestRunners.newTestRunner(processor); // no failures
generateTestData();
@ -204,9 +212,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
@ -230,9 +236,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
@ -251,9 +255,7 @@ public class TestPutElasticsearchHttpRecord {
runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
@ -272,9 +274,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
@ -293,9 +293,7 @@ public class TestPutElasticsearchHttpRecord {
runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
@ -319,9 +317,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setVariable("es.url", "http://127.0.0.1:9200");
runner.setVariable("connect.timeout", "5s");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
@ -340,9 +336,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "${no.attr}");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
@ -383,17 +377,13 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
runner.clearTransferState();
processor.setStatus(500, "Should retry");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_RETRY, 1);
}
@ -409,9 +399,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
}
@ -502,9 +490,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "index_fail");
runner.assertValid();
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_FAILURE, 1);
@ -527,9 +513,7 @@ public class TestPutElasticsearchHttpRecord {
// Set dynamic property, to be added to the URL as a query parameter
runner.setProperty("pipeline", "my-pipeline");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "28039652140"));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
@ -611,12 +595,12 @@ public class TestPutElasticsearchHttpRecord {
* A Test class that extends the processor in order to inject/mock behavior
*/
private static class PutElasticsearchHttpRecordTestProcessor extends PutElasticsearchHttpRecord {
int numResponseFailures = 0;
int numResponseFailures;
OkHttpClient client;
int statusCode = 200;
String statusMessage = "OK";
String expectedUrl = null;
Consumer<Map>[] recordChecks;
Consumer<Map<?, ?>>[] recordChecks;
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
this.numResponseFailures = responseHasFailures ? 1 : 0;
@ -636,7 +620,7 @@ public class TestPutElasticsearchHttpRecord {
}
@SafeVarargs
final void setRecordChecks(Consumer<Map>... checks) {
final void setRecordChecks(Consumer<Map<?, ?>>... checks) {
recordChecks = checks;
}
@ -687,7 +671,7 @@ public class TestPutElasticsearchHttpRecord {
.protocol(Protocol.HTTP_1_1)
.code(statusCode)
.message(statusMessage)
.body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
.body(ResponseBody.create(sb.toString(), MediaType.parse("application/json")))
.build();
when(call.execute()).thenReturn(mockResponse);
@ -778,7 +762,7 @@ public class TestPutElasticsearchHttpRecord {
}
@Test(expected = AssertionError.class)
public void testPutElasticSearchBadHostInEL() throws IOException {
public void testPutElasticSearchBadHostInEL() {
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
@ -787,9 +771,7 @@ public class TestPutElasticsearchHttpRecord {
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.assertValid();
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "1");
}});
runner.enqueue(new byte[0], Collections.singletonMap("doc_id", "1"));
runner.run();
}
@ -817,8 +799,12 @@ public class TestPutElasticsearchHttpRecord {
parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
parser.addSchemaField("amount", RecordFieldType.DECIMAL);
final Date date = Date.valueOf(ISO_DATE);
final Timestamp timestamp = Timestamp.valueOf(LOCAL_DATE_TIME);
final Time time = Time.valueOf(LOCAL_TIME);
for(int i=1; i<=numRecords; i++) {
parser.addRecord(i, "reç" + i, 100 + i, Date.valueOf("2018-12-20"), new Time(68150000), new Timestamp(1545332150000L), new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
parser.addRecord(i, "reç" + i, 100 + i, date, time, timestamp, new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
}
}

View File

@ -21,7 +21,6 @@ import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@ -45,7 +44,10 @@ import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
@ -53,7 +55,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.junit.Assert.assertArrayEquals;
@ -70,10 +71,16 @@ public class TestQueryRecord {
private static final String REL_NAME = "success";
private static final String ISO_DATE = "2018-02-04";
private static final String INSTANT_FORMATTED = String.format("%sT10:20:55Z", ISO_DATE);
private static final Instant INSTANT = Instant.parse(INSTANT_FORMATTED);
private static final Date INSTANT_DATE = Date.from(INSTANT);
private static final long INSTANT_EPOCH_MILLIS = INSTANT.toEpochMilli();
public TestRunner getRunner() {
TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
/**
/*
* we have to disable validation of expression language because the scope of the evaluation
* depends of the value of another property: if we are caching the schema/queries or not. If
* we don't disable the validation, it'll throw an error saying that the scope is incorrect.
@ -129,7 +136,11 @@ public class TestQueryRecord {
assertEquals(30, output.getValue("ageObj"));
assertArrayEquals(new String[] { "red", "green"}, (Object[]) output.getValue("colors"));
assertArrayEquals(new String[] { "John Doe", "Jane Doe"}, (Object[]) output.getValue("names"));
assertEquals("1517702400000", output.getAsString("joinTime"));
final LocalDate localDate = LocalDate.parse(ISO_DATE);
final ZonedDateTime zonedDateTime = ZonedDateTime.of(localDate.atStartOfDay(), ZoneOffset.systemDefault());
final long epochMillis = zonedDateTime.toInstant().toEpochMilli();
assertEquals(Long.toString(epochMillis), output.getAsString("joinTime"));
assertEquals(Double.valueOf(180.8D), output.getAsDouble("weight"));
}
@ -679,14 +690,14 @@ public class TestQueryRecord {
favorites.put("roses", "raindrops");
favorites.put("kittens", "whiskers");
final long ts = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(365 * 30);
final Map<String, Object> map = new HashMap<>();
map.put("name", "John Doe");
map.put("age", 30);
map.put("favoriteColors", new String[] { "red", "green" });
map.put("dob", new Date(ts));
map.put("dobTimestamp", ts);
map.put("joinTimestamp", "2018-02-04 10:20:55.802");
map.put("dob", INSTANT_DATE);
map.put("dobTimestamp", INSTANT_EPOCH_MILLIS);
map.put("joinTimestamp", INSTANT_FORMATTED);
map.put("weight", 180.8D);
map.put("height", 60.5);
map.put("mother", mother);
@ -696,8 +707,7 @@ public class TestQueryRecord {
personValues.put("person", person);
personValues.put("favoriteThings", favorites);
final Record record = new MapRecord(recordSchema, personValues);
return record;
return new MapRecord(recordSchema, personValues);
}
@ -722,8 +732,7 @@ public class TestQueryRecord {
map.put("id", id);
map.put("tags", Arrays.asList(tags));
final Record record = new MapRecord(recordSchema, map);
return record;
return new MapRecord(recordSchema, map);
}
/**
@ -800,9 +809,7 @@ public class TestQueryRecord {
map.put("height", 60.5);
map.put("title", "Software Engineer");
map.put("addresses", new Record[] {homeAddress, workAddress});
final Record person = new MapRecord(personSchema, map);
return person;
return new MapRecord(personSchema, map);
}
@ -834,7 +841,7 @@ public class TestQueryRecord {
}
@Test
public void testSimple() throws InitializationException, IOException, SQLException {
public void testSimple() throws InitializationException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("age", RecordFieldType.INT);
@ -867,7 +874,7 @@ public class TestQueryRecord {
}
@Test
public void testNullable() throws InitializationException, IOException, SQLException {
public void testNullable() throws InitializationException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("name", RecordFieldType.STRING, true);
parser.addSchemaField("age", RecordFieldType.INT, true);
@ -902,7 +909,7 @@ public class TestQueryRecord {
}
@Test
public void testParseFailure() throws InitializationException, IOException, SQLException {
public void testParseFailure() throws InitializationException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("age", RecordFieldType.INT);
@ -935,7 +942,7 @@ public class TestQueryRecord {
}
@Test
public void testNoRecordsInput() throws InitializationException, IOException, SQLException {
public void testNoRecordsInput() throws InitializationException {
TestRunner runner = getRunner();
CSVReader csvReader = new CSVReader();
@ -964,7 +971,7 @@ public class TestQueryRecord {
@Test
public void testTransformCalc() throws InitializationException, IOException, SQLException {
public void testTransformCalc() throws InitializationException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("ID", RecordFieldType.INT);
parser.addSchemaField("AMOUNT1", RecordFieldType.FLOAT);
@ -1029,7 +1036,7 @@ public class TestQueryRecord {
}
@Test
public void testAggregateFunction() throws InitializationException, IOException {
public void testAggregateFunction() throws InitializationException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("points", RecordFieldType.INT);
@ -1058,7 +1065,7 @@ public class TestQueryRecord {
}
@Test
public void testNullValueInSingleField() throws InitializationException, IOException {
public void testNullValueInSingleField() throws InitializationException {
final MockRecordParser parser = new MockRecordParser();
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("points", RecordFieldType.INT);
@ -1134,7 +1141,7 @@ public class TestQueryRecord {
}
@Override
public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) {
final List<RecordField> recordFields = columnNames.stream()
.map(name -> new RecordField(name, RecordFieldType.STRING.getDataType()))
.collect(Collectors.toList());
@ -1178,7 +1185,7 @@ public class TestQueryRecord {
}
@Override
public WriteResult write(Record record) throws IOException {
public WriteResult write(Record record) {
return null;
}
@ -1188,11 +1195,11 @@ public class TestQueryRecord {
}
@Override
public void beginRecordSet() throws IOException {
public void beginRecordSet() {
}
@Override
public WriteResult finishRecordSet() throws IOException {
public WriteResult finishRecordSet() {
return WriteResult.EMPTY;
}
};

View File

@ -38,7 +38,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
@ -78,7 +77,6 @@ public class TestAvroReaderWithEmbeddedSchema {
final int epochDay = 17260;
final String expectedTime = "2017-04-04 14:20:33.000";
final DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("gmt"));
final long timeLong = df.parse(expectedTime).getTime();
final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60);
@ -342,7 +340,7 @@ public class TestAvroReaderWithEmbeddedSchema {
return array;
}
public static enum Status {
public enum Status {
GOOD, BAD;
}
}

View File

@ -45,7 +45,6 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -204,7 +203,6 @@ public class TestCSVRecordReader {
public void testTimeNoCoersionExpectedFormat() throws IOException, MalformedRecordException, ParseException {
final String timeFormat = "HH!mm!ss";
DateFormat dateFmt = new SimpleDateFormat(timeFormat);
dateFmt.setTimeZone(TimeZone.getTimeZone("gmt"));
final String timeVal = "19!02!03";
final String text = "time\n" + timeVal;
@ -278,7 +276,6 @@ public class TestCSVRecordReader {
public void testTimestampNoCoersionExpectedFormat() throws IOException, MalformedRecordException, ParseException {
final String timeFormat = "HH!mm!ss";
DateFormat dateFmt = new SimpleDateFormat(timeFormat);
dateFmt.setTimeZone(TimeZone.getTimeZone("gmt"));
final String timeVal = "19!02!03";
final String text = "timestamp\n" + timeVal;

View File

@ -45,7 +45,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
@ -389,7 +388,6 @@ public class TestWriteCSVResult {
private DateFormat getDateFormat(final String format) {
final DateFormat df = new SimpleDateFormat(format);
df.setTimeZone(TimeZone.getTimeZone("gmt"));
return df;
}

View File

@ -51,7 +51,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
@ -77,7 +76,6 @@ public class TestWriteJsonResult {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("gmt"));
final long time = df.parse("2017/01/01 17:00:00.000").getTime();
final Map<String, Object> map = new LinkedHashMap<>();

View File

@ -46,7 +46,6 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import static org.apache.nifi.record.NullSuppression.ALWAYS_SUPPRESS;
import static org.apache.nifi.record.NullSuppression.NEVER_SUPPRESS;
@ -171,7 +170,6 @@ public class TestWriteXMLResult {
final RecordSchema schema = new SimpleRecordSchema(fields, SCHEMA_IDENTIFIER_RECORD);
final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("gmt"));
final long time = df.parse("2017/01/01 17:00:00.000").getTime();
final String date = "2017-01-01";