mirror of
https://github.com/apache/nifi.git
synced 2025-03-06 09:29:33 +00:00
NIFI-5909 added optional settings for date, time, and timestamp formats used to write Records to Elasticsearch
NIFI-5909 added content checks to the unit tests NIFI-5937 use explicit long value for test dates/times (to not depend on the timezone of test executor) NIFI-5937 tabs to spaces Fixing checkstyle violations introduced by https://github.com/apache/nifi/pull/3249 PR) NIFI-5937 adjusted property descriptions for consistency; limited EL scope to variable registry; added an appropriate validator along with its Maven dependency; moved format initialization to @OnScheduled NIFI-5909 tabs to spaces Signed-off-by: Ed <edward.berezitsky@gmail.com> This closes #3227
This commit is contained in:
parent
1a443c73ec
commit
3e52ae952d
@ -122,6 +122,11 @@ language governing permissions and limitations under the License. -->
|
|||||||
<artifactId>jackson-databind</artifactId>
|
<artifactId>jackson-databind</artifactId>
|
||||||
<version>${jackson.version}</version>
|
<version>${jackson.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-standard-record-utils</artifactId>
|
||||||
|
<version>1.9.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
@ -55,6 +55,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
|
|||||||
import org.apache.nifi.serialization.MalformedRecordException;
|
import org.apache.nifi.serialization.MalformedRecordException;
|
||||||
import org.apache.nifi.serialization.RecordReader;
|
import org.apache.nifi.serialization.RecordReader;
|
||||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||||
|
import org.apache.nifi.serialization.SimpleDateFormatValidator;
|
||||||
import org.apache.nifi.serialization.record.DataType;
|
import org.apache.nifi.serialization.record.DataType;
|
||||||
import org.apache.nifi.serialization.record.Record;
|
import org.apache.nifi.serialization.record.Record;
|
||||||
import org.apache.nifi.serialization.record.RecordField;
|
import org.apache.nifi.serialization.record.RecordField;
|
||||||
@ -178,6 +179,38 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
|
||||||
|
.name("Date Format")
|
||||||
|
.description("Specifies the format to use when reading/writing Date fields. "
|
||||||
|
+ "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
|
||||||
|
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
|
||||||
|
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).")
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.addValidator(new SimpleDateFormatValidator())
|
||||||
|
.required(false)
|
||||||
|
.build();
|
||||||
|
static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
|
||||||
|
.name("Time Format")
|
||||||
|
.description("Specifies the format to use when reading/writing Time fields. "
|
||||||
|
+ "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. "
|
||||||
|
+ "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
|
||||||
|
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.addValidator(new SimpleDateFormatValidator())
|
||||||
|
.required(false)
|
||||||
|
.build();
|
||||||
|
static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
|
||||||
|
.name("Timestamp Format")
|
||||||
|
.description("Specifies the format to use when reading/writing Timestamp fields. "
|
||||||
|
+ "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
|
||||||
|
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
|
||||||
|
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
|
||||||
|
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).")
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.addValidator(new SimpleDateFormatValidator())
|
||||||
|
.required(false)
|
||||||
|
.build();
|
||||||
|
|
||||||
private static final Set<Relationship> relationships;
|
private static final Set<Relationship> relationships;
|
||||||
private static final List<PropertyDescriptor> propertyDescriptors;
|
private static final List<PropertyDescriptor> propertyDescriptors;
|
||||||
|
|
||||||
@ -186,6 +219,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||||||
private final JsonFactory factory = new JsonFactory();
|
private final JsonFactory factory = new JsonFactory();
|
||||||
|
|
||||||
private volatile String nullSuppression;
|
private volatile String nullSuppression;
|
||||||
|
private volatile String dateFormat;
|
||||||
|
private volatile String timeFormat;
|
||||||
|
private volatile String timestampFormat;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
final Set<Relationship> _rels = new HashSet<>();
|
final Set<Relationship> _rels = new HashSet<>();
|
||||||
@ -202,6 +238,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||||||
descriptors.add(CHARSET);
|
descriptors.add(CHARSET);
|
||||||
descriptors.add(INDEX_OP);
|
descriptors.add(INDEX_OP);
|
||||||
descriptors.add(SUPPRESS_NULLS);
|
descriptors.add(SUPPRESS_NULLS);
|
||||||
|
descriptors.add(DATE_FORMAT);
|
||||||
|
descriptors.add(TIME_FORMAT);
|
||||||
|
descriptors.add(TIMESTAMP_FORMAT);
|
||||||
|
|
||||||
propertyDescriptors = Collections.unmodifiableList(descriptors);
|
propertyDescriptors = Collections.unmodifiableList(descriptors);
|
||||||
}
|
}
|
||||||
@ -248,6 +287,18 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||||||
public void setup(ProcessContext context) {
|
public void setup(ProcessContext context) {
|
||||||
super.setup(context);
|
super.setup(context);
|
||||||
recordPathCache = new RecordPathCache(10);
|
recordPathCache = new RecordPathCache(10);
|
||||||
|
this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
|
||||||
|
if (this.dateFormat == null) {
|
||||||
|
this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
|
||||||
|
}
|
||||||
|
this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
|
||||||
|
if (this.timeFormat == null) {
|
||||||
|
this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
|
||||||
|
}
|
||||||
|
this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
|
||||||
|
if (this.timestampFormat == null) {
|
||||||
|
this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -486,7 +537,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||||||
|
|
||||||
switch (chosenDataType.getFieldType()) {
|
switch (chosenDataType.getFieldType()) {
|
||||||
case DATE: {
|
case DATE: {
|
||||||
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
|
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.dateFormat));
|
||||||
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
||||||
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
||||||
} else {
|
} else {
|
||||||
@ -495,7 +546,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TIME: {
|
case TIME: {
|
||||||
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat()));
|
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.timeFormat));
|
||||||
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
||||||
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
||||||
} else {
|
} else {
|
||||||
@ -504,7 +555,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TIMESTAMP: {
|
case TIMESTAMP: {
|
||||||
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
|
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.timestampFormat));
|
||||||
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
|
||||||
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
|
||||||
} else {
|
} else {
|
||||||
|
@ -41,6 +41,9 @@ import org.junit.Test;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
|
import java.sql.Date;
|
||||||
|
import java.sql.Time;
|
||||||
|
import java.sql.Timestamp;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -68,18 +71,30 @@ public class TestPutElasticsearchHttpRecord {
|
|||||||
assertEquals(1, record.get("id"));
|
assertEquals(1, record.get("id"));
|
||||||
assertEquals("reç1", record.get("name"));
|
assertEquals("reç1", record.get("name"));
|
||||||
assertEquals(101, record.get("code"));
|
assertEquals(101, record.get("code"));
|
||||||
|
assertEquals("20/12/2018", record.get("date"));
|
||||||
|
assertEquals("6:55 PM", record.get("time"));
|
||||||
|
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
|
||||||
}, record -> {
|
}, record -> {
|
||||||
assertEquals(2, record.get("id"));
|
assertEquals(2, record.get("id"));
|
||||||
assertEquals("ræc2", record.get("name"));
|
assertEquals("ræc2", record.get("name"));
|
||||||
assertEquals(102, record.get("code"));
|
assertEquals(102, record.get("code"));
|
||||||
|
assertEquals("20/12/2018", record.get("date"));
|
||||||
|
assertEquals("6:55 PM", record.get("time"));
|
||||||
|
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
|
||||||
}, record -> {
|
}, record -> {
|
||||||
assertEquals(3, record.get("id"));
|
assertEquals(3, record.get("id"));
|
||||||
assertEquals("rèc3", record.get("name"));
|
assertEquals("rèc3", record.get("name"));
|
||||||
assertEquals(103, record.get("code"));
|
assertEquals(103, record.get("code"));
|
||||||
|
assertEquals("20/12/2018", record.get("date"));
|
||||||
|
assertEquals("6:55 PM", record.get("time"));
|
||||||
|
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
|
||||||
}, record -> {
|
}, record -> {
|
||||||
assertEquals(4, record.get("id"));
|
assertEquals(4, record.get("id"));
|
||||||
assertEquals("rëc4", record.get("name"));
|
assertEquals("rëc4", record.get("name"));
|
||||||
assertEquals(104, record.get("code"));
|
assertEquals(104, record.get("code"));
|
||||||
|
assertEquals("20/12/2018", record.get("date"));
|
||||||
|
assertEquals("6:55 PM", record.get("time"));
|
||||||
|
assertEquals("20/12/2018 6:55 PM", record.get("ts"));
|
||||||
});
|
});
|
||||||
runner = TestRunners.newTestRunner(processor); // no failures
|
runner = TestRunners.newTestRunner(processor); // no failures
|
||||||
generateTestData();
|
generateTestData();
|
||||||
@ -88,6 +103,9 @@ public class TestPutElasticsearchHttpRecord {
|
|||||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
|
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
|
||||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
|
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
|
||||||
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
|
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
|
||||||
|
runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
|
||||||
|
runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
|
||||||
|
runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, "d/M/yyyy h:m a");
|
||||||
|
|
||||||
runner.enqueue(new byte[0], new HashMap<String, String>() {{
|
runner.enqueue(new byte[0], new HashMap<String, String>() {{
|
||||||
put("doc_id", "28039652140");
|
put("doc_id", "28039652140");
|
||||||
@ -564,10 +582,13 @@ public class TestPutElasticsearchHttpRecord {
|
|||||||
parser.addSchemaField("id", RecordFieldType.INT);
|
parser.addSchemaField("id", RecordFieldType.INT);
|
||||||
parser.addSchemaField("name", RecordFieldType.STRING);
|
parser.addSchemaField("name", RecordFieldType.STRING);
|
||||||
parser.addSchemaField("code", RecordFieldType.INT);
|
parser.addSchemaField("code", RecordFieldType.INT);
|
||||||
|
parser.addSchemaField("date", RecordFieldType.DATE);
|
||||||
|
parser.addSchemaField("time", RecordFieldType.TIME);
|
||||||
|
parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
|
||||||
|
|
||||||
parser.addRecord(1, "reç1", 101);
|
parser.addRecord(1, "reç1", 101, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
|
||||||
parser.addRecord(2, "ræc2", 102);
|
parser.addRecord(2, "ræc2", 102, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
|
||||||
parser.addRecord(3, "rèc3", 103);
|
parser.addRecord(3, "rèc3", 103, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
|
||||||
parser.addRecord(4, "rëc4", 104);
|
parser.addRecord(4, "rëc4", 104, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,11 +17,9 @@
|
|||||||
package org.apache.nifi.processors.hive;
|
package org.apache.nifi.processors.hive;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.hive.conf.HiveConf;
|
import org.apache.hadoop.hive.conf.HiveConf;
|
||||||
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
|
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hive.common.util.ShutdownHookManager;
|
|
||||||
import org.apache.hive.streaming.ConnectionError;
|
import org.apache.hive.streaming.ConnectionError;
|
||||||
import org.apache.hive.streaming.HiveStreamingConnection;
|
import org.apache.hive.streaming.HiveStreamingConnection;
|
||||||
import org.apache.hive.streaming.InvalidTable;
|
import org.apache.hive.streaming.InvalidTable;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user