diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml index d4536cd037..4db59f6078 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml @@ -122,6 +122,11 @@ language governing permissions and limitations under the License. --> jackson-databind ${jackson.version} + + org.apache.nifi + nifi-standard-record-utils + 1.9.0-SNAPSHOT + diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java index 52de42442a..d431960d3d 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java @@ -55,6 +55,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleDateFormatValidator; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -178,6 +179,38 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess .required(true) .build(); + static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() + .name("Date Format") + .description("Specifies the format to use when reading/writing Date fields. " + + "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. " + + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by " + + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(new SimpleDateFormatValidator()) + .required(false) + .build(); + static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder() + .name("Time Format") + .description("Specifies the format to use when reading/writing Time fields. " + + "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. " + + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by " + + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(new SimpleDateFormatValidator()) + .required(false) + .build(); + static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder() + .name("Timestamp Format") + .description("Specifies the format to use when reading/writing Timestamp fields. " + + "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. " + + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by " + + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by " + + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(new SimpleDateFormatValidator()) + .required(false) + .build(); + private static final Set relationships; private static final List propertyDescriptors; @@ -186,6 +219,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess private final JsonFactory factory = new JsonFactory(); private volatile String nullSuppression; + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; static { final Set _rels = new HashSet<>(); @@ -202,6 +238,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess descriptors.add(CHARSET); descriptors.add(INDEX_OP); descriptors.add(SUPPRESS_NULLS); + descriptors.add(DATE_FORMAT); + descriptors.add(TIME_FORMAT); + descriptors.add(TIMESTAMP_FORMAT); propertyDescriptors = Collections.unmodifiableList(descriptors); } @@ -248,6 +287,18 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess public void setup(ProcessContext context) { super.setup(context); recordPathCache = new RecordPathCache(10); + this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue(); + if (this.dateFormat == null) { + this.dateFormat = RecordFieldType.DATE.getDefaultFormat(); + } + this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue(); + if (this.timeFormat == null) { + this.timeFormat = RecordFieldType.TIME.getDefaultFormat(); + } + this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue(); + if (this.timestampFormat == null) { + this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); + } } @Override @@ -486,7 +537,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess switch (chosenDataType.getFieldType()) { case DATE: { - final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.dateFormat)); if (DataTypeUtils.isLongTypeCompatible(stringValue)) { generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName)); } else { @@ -495,7 +546,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess break; } case TIME: { - final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat())); + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.timeFormat)); if (DataTypeUtils.isLongTypeCompatible(stringValue)) { generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName)); } else { @@ -504,7 +555,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess break; } case TIMESTAMP: { - final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.timestampFormat)); if (DataTypeUtils.isLongTypeCompatible(stringValue)) { generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName)); } else { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java index 2cc16c1aba..992e6159bc 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java @@ -41,6 +41,9 @@ import org.junit.Test; import java.io.IOException; import java.net.ConnectException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -68,18 +71,30 @@ public class TestPutElasticsearchHttpRecord { assertEquals(1, record.get("id")); assertEquals("reç1", record.get("name")); assertEquals(101, record.get("code")); + assertEquals("20/12/2018", record.get("date")); + assertEquals("6:55 PM", record.get("time")); + assertEquals("20/12/2018 6:55 PM", record.get("ts")); }, record -> { assertEquals(2, record.get("id")); assertEquals("ræc2", record.get("name")); assertEquals(102, record.get("code")); + assertEquals("20/12/2018", record.get("date")); + assertEquals("6:55 PM", record.get("time")); + assertEquals("20/12/2018 6:55 PM", record.get("ts")); }, record -> { assertEquals(3, record.get("id")); assertEquals("rèc3", record.get("name")); assertEquals(103, record.get("code")); + assertEquals("20/12/2018", record.get("date")); + assertEquals("6:55 PM", record.get("time")); + assertEquals("20/12/2018 6:55 PM", record.get("ts")); }, record -> { assertEquals(4, record.get("id")); assertEquals("rëc4", record.get("name")); assertEquals(104, record.get("code")); + assertEquals("20/12/2018", record.get("date")); + assertEquals("6:55 PM", record.get("time")); + assertEquals("20/12/2018 6:55 PM", record.get("ts")); }); runner = TestRunners.newTestRunner(processor); // no failures generateTestData(); @@ -88,6 +103,9 @@ public class TestPutElasticsearchHttpRecord { runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy"); + runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a"); + runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a"); runner.enqueue(new byte[0], new HashMap() {{ put("doc_id", "28039652140"); @@ -564,10 +582,13 @@ public class TestPutElasticsearchHttpRecord { parser.addSchemaField("id", RecordFieldType.INT); parser.addSchemaField("name", RecordFieldType.STRING); parser.addSchemaField("code", RecordFieldType.INT); + parser.addSchemaField("date", RecordFieldType.DATE); + parser.addSchemaField("time", RecordFieldType.TIME); + parser.addSchemaField("ts", RecordFieldType.TIMESTAMP); - parser.addRecord(1, "reç1", 101); - parser.addRecord(2, "ræc2", 102); - parser.addRecord(3, "rèc3", 103); - parser.addRecord(4, "rëc4", 104); + parser.addRecord(1, "reç1", 101, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L)); + parser.addRecord(2, "ræc2", 102, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L)); + parser.addRecord(3, "rèc3", 103, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L)); + parser.addRecord(4, "rëc4", 104, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L)); } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java index 81916c65fd..affbe11e28 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java @@ -17,11 +17,9 @@ package org.apache.nifi.processors.hive; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.common.util.ShutdownHookManager; import org.apache.hive.streaming.ConnectionError; import org.apache.hive.streaming.HiveStreamingConnection; import org.apache.hive.streaming.InvalidTable;