NIFI-7990: Add properties to map Record field as @timestamp in output to Elasticsearch for PutElasticsearchRecord and PutElasticsearchHttpRecord processors; NIFI-7474 allow mapped id field to be retained within the Record for PutElasticsearchRecord

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #4691.
This commit is contained in:
Chris Sampson 2021-09-02 11:57:57 +01:00 committed by Joe Gresock
parent 9d578e63fd
commit 3892e50991
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
6 changed files with 664 additions and 155 deletions

View File

@ -65,9 +65,9 @@ public class IndexOperationRequest {
Index("index"),
Update("update"),
Upsert("upsert");
String value;
private final String value;
Operation(String value) {
Operation(final String value) {
this.value = value;
}
@ -80,6 +80,10 @@ public class IndexOperationRequest {
.filter(o -> o.getValue().equalsIgnoreCase(value)).findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown Index Operation %s", value)));
}
public static String[] allValues() {
return Arrays.stream(Operation.values()).map(Operation::getValue).sorted().toArray(String[]::new);
}
}
@Override

View File

@ -66,7 +66,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;
@ -78,13 +77,13 @@ import java.io.OutputStream;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@ -153,7 +152,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-id-path")
.displayName("Identifier Record Path")
.description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\", "
.description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\" or \"create\", "
+ "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be "
+ "auto-generated by Elasticsearch. For all other Index Operations, the field's value must be non-empty.")
.required(false)
@ -209,12 +208,31 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
.required(true)
.build();
static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp")
.displayName("@timestamp Value")
.description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-path")
.displayName("@timestamp Record Path")
.description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document. " +
"If left blank the @timestamp will be determined using the main @timestamp property")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
.name("Date Format")
.description("Specifies the format to use when reading/writing Date fields. "
+ "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).")
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
@ -235,7 +253,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
+ "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).")
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/25/2017 18:04:15).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
@ -266,6 +284,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
descriptors.add(RECORD_WRITER);
descriptors.add(LOG_ALL_ERRORS);
descriptors.add(ID_RECORD_PATH);
descriptors.add(AT_TIMESTAMP_RECORD_PATH);
descriptors.add(AT_TIMESTAMP);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(INDEX_OP);
@ -292,7 +312,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
// Since Expression Language is allowed for index operation, we can't guarantee that we can catch
// all invalid configurations, but we should catch them as soon as we can. For example, if the
// Identifier Record Path property is empty, the Index Operation must evaluate to "index".
// Identifier Record Path property is empty, the Index Operation must evaluate to "index" or "create".
String idPath = validationContext.getProperty(ID_RECORD_PATH).getValue();
String indexOp = validationContext.getProperty(INDEX_OP).getValue();
@ -363,7 +383,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
if (StringUtils.isEmpty(baseUrl)) {
throw new ProcessException("Elasticsearch URL is empty or null, this indicates an invalid Expression (missing variables, e.g.)");
}
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
HttpUrl.Builder urlBuilder = Objects.requireNonNull(HttpUrl.parse(baseUrl)).newBuilder().addPathSegment("_bulk");
// Find the user-added properties and set them as query parameters on the URL
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
@ -378,14 +398,14 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(index)) {
logger.error("No value for index in for {}, transferring to failure", new Object[]{flowFile});
logger.error("No value for index in for {}, transferring to failure", flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(indexOp)) {
logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{flowFile});
logger.error("No Index operation specified for {}, transferring to failure.", flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
@ -398,18 +418,22 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
case "delete":
break;
default:
logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, flowFile});
logger.error("Index operation {} not supported for {}, transferring to failure.", indexOp, flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
this.nullSuppression = context.getProperty(SUPPRESS_NULLS).getValue();
final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path);
final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath recordPath = StringUtils.isEmpty(idPath) ? null : recordPathCache.getCompiled(idPath);
final StringBuilder sb = new StringBuilder();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
final String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath atPath = StringUtils.isEmpty(atTimestampPath) ? null : recordPathCache.getCompiled(atTimestampPath);
int recordCount = 0;
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
@ -428,6 +452,14 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
id = null;
}
final Object timestamp;
if (atPath != null) {
final Optional<FieldValue> atPathValue = atPath.evaluate(record).getSelectedFields().findFirst();
timestamp = !atPathValue.isPresent() || atPathValue.get().getValue() == null ? atTimestamp : atPathValue.get();
} else {
timestamp = atTimestamp;
}
// The ID must be valid for all operations except "index" or "create". For that case,
// a missing ID indicates one is to be auto-generated by Elasticsearch
if (id == null && !(indexOp.equalsIgnoreCase("index") || indexOp.equalsIgnoreCase("create"))) {
@ -437,8 +469,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final StringBuilder json = new StringBuilder();
ByteArrayOutputStream out = new ByteArrayOutputStream();
JsonGenerator generator = factory.createJsonGenerator(out);
writeRecord(record, record.getSchema(), generator);
JsonGenerator generator = factory.createGenerator(out);
writeRecord(record, generator, timestamp);
generator.flush();
generator.close();
json.append(out.toString(charset.name()));
@ -447,7 +479,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
recordCount++;
}
} catch (IdentifierNotFoundException infe) {
logger.error(infe.getMessage(), new Object[]{flowFile});
logger.error(infe.getMessage(), flowFile);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
@ -459,7 +491,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
return;
}
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString());
RequestBody requestBody = RequestBody.create(sb.toString(), MediaType.parse("application/json"));
final Response getResponse;
try {
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
@ -474,49 +506,50 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final Set<Integer> failures = new HashSet<>();
if (isSuccess(statusCode)) {
try (ResponseBody responseBody = getResponse.body()) {
final byte[] bodyBytes = responseBody.bytes();
try (final ResponseBody responseBody = getResponse.body()) {
if (responseBody != null) {
final byte[] bodyBytes = responseBody.bytes();
JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
boolean errors = responseJson.get("errors").asBoolean(false);
// ES has no rollback, so if errors occur, log them and route the whole flow file to failure
if (errors) {
ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
if(itemNodeArray != null) {
if (itemNodeArray.size() > 0) {
// All items are returned whether they succeeded or failed, so iterate through the item array
// at the same time as the flow file list, moving each to success or failure accordingly,
// but only keep the first error for logging
String errorReason = null;
for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
JsonNode itemNode = itemNodeArray.get(i);
int status = itemNode.findPath("status").asInt();
if (!isSuccess(status)) {
if (errorReason == null || logAllErrors) {
// Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
String reason = itemNode.findPath("result").asText();
if (StringUtils.isEmpty(reason)) {
// If there was no result, we expect an error with a string description in the "reason" field
reason = itemNode.findPath("reason").asText();
JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
boolean errors = responseJson.get("errors").asBoolean(false);
// ES has no rollback, so if errors occur, log them and route the whole flow file to failure
if (errors) {
ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items");
if (itemNodeArray != null) {
if (itemNodeArray.size() > 0) {
// All items are returned whether they succeeded or failed, so iterate through the item array
// at the same time as the flow file list, moving each to success or failure accordingly,
// but only keep the first error for logging
String errorReason = null;
for (int i = itemNodeArray.size() - 1; i >= 0; i--) {
JsonNode itemNode = itemNodeArray.get(i);
int status = itemNode.findPath("status").asInt();
if (!isSuccess(status)) {
if (errorReason == null || logAllErrors) {
// Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason
String reason = itemNode.findPath("result").asText();
if (StringUtils.isEmpty(reason)) {
// If there was no result, we expect an error with a string description in the "reason" field
reason = itemNode.findPath("reason").asText();
}
errorReason = reason;
logger.error("Failed to process record {} in FlowFile {} due to {}, transferring to failure",
i, flowFile, errorReason);
}
errorReason = reason;
logger.error("Failed to process record {} in FlowFile {} due to {}, transferring to failure",
new Object[]{i, flowFile, errorReason});
failures.add(i);
}
failures.add(i);
}
}
}
} else {
// Everything succeeded, route FF and end
flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, url.toString());
return;
}
} else {
// Everything succeeded, route FF and end
flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, url.toString());
return;
}
} catch (IOException ioe) {
// Something went wrong when parsing the response, log the error and route to failure
logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe);
@ -529,12 +562,12 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
} else if (statusCode / 100 == 5) {
// 5xx -> RETRY, but a server error might last a while, so yield
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...",
new Object[]{statusCode, getResponse.message()});
statusCode, getResponse.message());
session.transfer(flowFile, REL_RETRY);
context.yield();
return;
} else { // 1xx, 3xx, 4xx, etc. -> NO RETRY
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", statusCode, getResponse.message());
session.transfer(flowFile, REL_FAILURE);
return;
}
@ -549,17 +582,16 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
final RecordSetWriterFactory writerFactory = writerFactoryOptional.get();
// We know there are a mixture of successes and failures, create FFs for each and rename input FF to avoid confusion.
final FlowFile inputFlowFile = flowFile;
final FlowFile successFlowFile = session.create(inputFlowFile);
final FlowFile failedFlowFile = session.create(inputFlowFile);
final FlowFile successFlowFile = session.create(flowFile);
final FlowFile failedFlowFile = session.create(flowFile);
// Set up the reader and writers
try (final OutputStream successOut = session.write(successFlowFile);
final OutputStream failedOut = session.write(failedFlowFile);
final InputStream in = session.read(inputFlowFile);
final RecordReader reader = readerFactory.createRecordReader(inputFlowFile, in, getLogger())) {
final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
final RecordSchema schema = writerFactory.getSchema(inputFlowFile.getAttributes(), reader.getSchema());
final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema());
try (final RecordSetWriter successWriter = writerFactory.createWriter(getLogger(), schema, successOut, successFlowFile);
final RecordSetWriter failedWriter = writerFactory.createWriter(getLogger(), schema, failedOut, failedFlowFile)) {
@ -581,8 +613,8 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
}
} catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
// We failed while handling individual failures. Not much else we can do other than log, and route the whole thing to failure.
getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", new Object[] {flowFile, e});
session.transfer(inputFlowFile, REL_FAILURE);
getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
if (successFlowFile != null) {
session.remove(successFlowFile);
}
@ -598,15 +630,34 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));
session.transfer(successFlowFile, REL_SUCCESS);
session.transfer(failedFlowFile, REL_FAILURE);
session.remove(inputFlowFile);
session.remove(flowFile);
}
}
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator)
throws IOException {
RecordSchema schema = record.getSchema();
private void writeRecord(final Record record, final JsonGenerator generator, final Object atTimestamp) throws IOException {
final RecordSchema schema = record.getSchema();
generator.writeStartObject();
if (atTimestamp != null && !(atTimestamp instanceof String && StringUtils.isBlank((String) atTimestamp))) {
final DataType atDataType;
final Object atValue;
if (atTimestamp instanceof FieldValue) {
final FieldValue atField = (FieldValue) atTimestamp;
atDataType = atField.getField().getDataType();
atValue = atField.getValue();
} else {
atDataType = RecordFieldType.STRING.getDataType();
atValue = atTimestamp.toString();
}
final Object outputValue = RecordFieldType.STRING.getDataType().equals(atDataType) ? coerceTimestampStringToLong(atValue.toString()) : atValue;
final DataType outputDataType = outputValue.equals(atValue) ? atDataType : RecordFieldType.LONG.getDataType();
generator.writeFieldName("@timestamp");
writeValue(generator, outputValue, "@timestamp", outputDataType);
}
for (int i = 0; i < schema.getFieldCount(); i++) {
final RecordField field = schema.getField(i);
final String fieldName = field.getFieldName();
@ -627,6 +678,12 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
generator.writeEndObject();
}
private Object coerceTimestampStringToLong(final String stringValue) {
return DataTypeUtils.isLongTypeCompatible(stringValue)
? DataTypeUtils.toLong(stringValue, "@timestamp")
: stringValue;
}
@SuppressWarnings("unchecked")
private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType) throws IOException {
if (value == null) {
@ -646,8 +703,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
switch (chosenDataType.getFieldType()) {
case DATE: {
// Use SimpleDateFormat with system default time zone for string conversion
final String stringValue = DataTypeUtils.toString(coercedValue, () -> new SimpleDateFormat(dateFormat));
final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(this.dateFormat));
if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
generator.writeNumber(DataTypeUtils.toLong(coercedValue, fieldName));
} else {
@ -711,13 +767,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
generator.writeString(stringValue);
}
break;
case RECORD: {
final Record record = (Record) coercedValue;
final RecordDataType recordDataType = (RecordDataType) chosenDataType;
final RecordSchema childSchema = recordDataType.getChildSchema();
writeRecord(record, childSchema, generator);
case RECORD:
writeRecord((Record) coercedValue, generator, null);
break;
}
case MAP: {
final MapDataType mapDataType = (MapDataType) chosenDataType;
final DataType valueDataType = mapDataType.getValueType();

View File

@ -49,6 +49,7 @@ import java.net.ConnectException;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
@ -56,9 +57,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ -75,6 +78,7 @@ public class TestPutElasticsearchHttpRecord {
private static final String ISO_DATE = String.format("%d-%d-%d", DATE_YEAR, DATE_MONTH, DATE_DAY);
private static final String EXPECTED_DATE = String.format("%d/%d/%d", DATE_DAY, DATE_MONTH, DATE_YEAR);
private static final LocalDateTime LOCAL_DATE_TIME = LocalDateTime.of(DATE_YEAR, DATE_MONTH, DATE_DAY, TIME_HOUR, TIME_MINUTE);
private static final LocalDate LOCAL_DATE = LocalDate.of(DATE_YEAR, DATE_MONTH, DATE_DAY);
private static final LocalTime LOCAL_TIME = LocalTime.of(TIME_HOUR, TIME_MINUTE);
private TestRunner runner;
@ -498,6 +502,141 @@ public class TestPutElasticsearchHttpRecord {
assertNotNull(out);
}
@Test
public void testPutElasticsearchOnTriggerWithNoAtTimestampPath() throws Exception {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
runner = TestRunners.newTestRunner(processor);
generateTestData(1);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.removeProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP); // no default
runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/none"); // Field does not exist
processor.setRecordChecks(record -> assertTimestamp(record, null)); // no @timestamp
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
runner.clearTransferState();
// now add a default @timestamp
final String timestamp = "2020-11-27T14:37:00.000Z";
runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP, timestamp);
processor.setRecordChecks(record -> assertTimestamp(record, timestamp)); // @timestamp defaulted
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out2);
}
@Test
public void testPutElasticsearchOnTriggerWithAtTimestampFromAttribute() throws IOException {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
runner = TestRunners.newTestRunner(processor);
generateTestData(1);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "${i}");
runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP, "${timestamp}");
final String timestamp = "2020-11-27T15:10:00.000Z";
processor.setRecordChecks(record -> assertTimestamp(record, timestamp));
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("i", "doc");
put("timestamp", timestamp);
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out);
runner.clearTransferState();
// Now try an empty attribute value, should be no timestamp
processor.setRecordChecks(record -> assertTimestamp(record, null));
runner.enqueue(new byte[0], new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("i", "doc");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
assertNotNull(out2);
}
@Test
public void testPutElasticsearchOnTriggerWithAtTimstampPath() throws Exception {
PutElasticsearchHttpRecordTestProcessor processor = new PutElasticsearchHttpRecordTestProcessor(false);
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat());
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat());
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat());
runner = TestRunners.newTestRunner(processor);
generateTestData(1);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/ts"); // TIMESTAMP
processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_DATE_TIME.format(dateTimeFormatter)));
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
runner.clearTransferState();
runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/date"); // DATE;
processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_DATE.format(dateFormatter)));
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
runner.clearTransferState();
runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/time"); // TIME
processor.setRecordChecks(record -> assertTimestamp(record, LOCAL_TIME.format(timeFormatter)));
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
runner.clearTransferState();
// these INT/STRING values might not make sense from an Elasticsearch point of view,
// but we want to prove we can handle them being selected from the Record
runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/code"); // INT
processor.setRecordChecks(record -> assertTimestamp(record, 101));
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
runner.clearTransferState();
runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/name"); // STRING
processor.setRecordChecks(record -> assertTimestamp(record, "reç1"));
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
runner.clearTransferState();
runner.setProperty(PutElasticsearchHttpRecord.AT_TIMESTAMP_RECORD_PATH, "/coerce"); // STRING coerced to LONG
processor.setRecordChecks(record -> assertTimestamp(record, 1000));
runner.enqueue(new byte[0]);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
assertNotNull(runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0));
runner.clearTransferState();
}
@Test
public void testPutElasticSearchOnTriggerQueryParameter() throws IOException {
PutElasticsearchHttpRecordTestProcessor p = new PutElasticsearchHttpRecordTestProcessor(false); // no failures
@ -600,7 +739,7 @@ public class TestPutElasticsearchHttpRecord {
int statusCode = 200;
String statusMessage = "OK";
String expectedUrl = null;
Consumer<Map<?, ?>>[] recordChecks;
Consumer<Map<String, Object>>[] recordChecks;
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
this.numResponseFailures = responseHasFailures ? 1 : 0;
@ -620,10 +759,11 @@ public class TestPutElasticsearchHttpRecord {
}
@SafeVarargs
final void setRecordChecks(Consumer<Map<?, ?>>... checks) {
final void setRecordChecks(Consumer<Map<String, Object>>... checks) {
recordChecks = checks;
}
@SuppressWarnings("unchecked")
@Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class);
@ -636,7 +776,7 @@ public class TestPutElasticsearchHttpRecord {
if (recordChecks != null) {
final ObjectMapper mapper = new ObjectMapper();
Buffer sink = new Buffer();
realRequest.body().writeTo(sink);
Objects.requireNonNull(realRequest.body()).writeTo(sink);
String line;
int recordIndex = 0;
boolean content = false;
@ -798,13 +938,13 @@ public class TestPutElasticsearchHttpRecord {
parser.addSchemaField("time", RecordFieldType.TIME);
parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
parser.addSchemaField("amount", RecordFieldType.DECIMAL);
parser.addSchemaField("coerce", RecordFieldType.STRING);
final Date date = Date.valueOf(ISO_DATE);
final Timestamp timestamp = Timestamp.valueOf(LOCAL_DATE_TIME);
final Time time = Time.valueOf(LOCAL_TIME);
for(int i=1; i<=numRecords; i++) {
parser.addRecord(i, "reç" + i, 100 + i, date, time, timestamp, new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
parser.addRecord(i, "reç" + i, 100 + i, date, time, timestamp, new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN), "1000");
}
}
@ -818,4 +958,12 @@ public class TestPutElasticsearchHttpRecord {
runner.enableControllerService(writer);
runner.setProperty(PutElasticsearchHttpRecord.RECORD_WRITER, "writer");
}
private void assertTimestamp(final Map<String, Object> record, final Object timestamp) {
if (timestamp == null) {
assertFalse(record.containsKey("@timestamp"));
} else {
assertEquals(timestamp, record.get("@timestamp"));
}
}
}

View File

@ -20,12 +20,6 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-elasticsearch-restapi-processors</artifactId>
<packaging>jar</packaging>
<properties>
<slf4jversion>2.7</slf4jversion>
<es.version>5.6.6</es.version>
<lucene.version>6.2.1</lucene.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -111,6 +105,12 @@ language governing permissions and limitations under the License. -->
<version>1.15.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-record-utils</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>

View File

@ -50,8 +50,11 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleDateFormatValidator;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.InputStream;
@ -89,23 +92,33 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.build();
static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
.name("put-es-record-index-op")
.displayName("Index Operation")
.description("The type of the operation used to index (create, delete, index, update, upsert)")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue(IndexOperationRequest.Operation.Index.getValue())
.required(true)
.build();
.name("put-es-record-index-op")
.displayName("Index Operation")
.description("The type of the operation used to index (create, delete, index, update, upsert)")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue(IndexOperationRequest.Operation.Index.getValue())
.required(true)
.build();
static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp")
.displayName("@timestamp Value")
.description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
static final PropertyDescriptor INDEX_OP_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-index-op-path")
.displayName("Index Operation Record Path")
.description("A record path expression to retrieve the Index Operation field for use with Elasticsearch. If left blank " +
"the Index Operation will be determined using the main Index Operation property.")
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
.name("put-es-record-index-op-path")
.displayName("Index Operation Record Path")
.description("A record path expression to retrieve the Index Operation field for use with Elasticsearch. If left blank " +
"the Index Operation will be determined using the main Index Operation property.")
.addValidator(new RecordPathValidator())
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-id-path")
@ -113,6 +126,19 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.description("A record path expression to retrieve the ID field for use with Elasticsearch. If left blank " +
"the ID will be automatically generated by Elasticsearch.")
.addValidator(new RecordPathValidator())
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor RETAIN_ID_FIELD = new PropertyDescriptor.Builder()
.name("put-es-record-retain-id-field")
.displayName("Retain ID (Record Path)")
.description("Whether to retain the existing field used as the ID Record Path.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
.dependsOn(ID_RECORD_PATH)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
@ -136,6 +162,27 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-path")
.displayName("@timestamp Record Path")
.description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document. " +
"If left blank the @timestamp will be determined using the main @timestamp property")
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor RETAIN_AT_TIMESTAMP_FIELD = new PropertyDescriptor.Builder()
.name("put-es-record-retain-at-timestamp-field")
.displayName("Retain @timestamp (Record Path)")
.description("Whether to retain the existing field used as the @timestamp Record Path.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
.dependsOn(AT_TIMESTAMP_RECORD_PATH)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor ERROR_RECORD_WRITER = new PropertyDescriptor.Builder()
.name("put-es-record-error-writer")
.displayName("Error Record Writer")
@ -147,9 +194,51 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
.required(false)
.build();
static final PropertyDescriptor AT_TIMESTAMP_DATE_FORMAT = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-date-format")
.displayName("@Timestamp Record Path Date Format")
.description("Specifies the format to use when writing Date field for @timestamp. "
+ "If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
.dependsOn(AT_TIMESTAMP_RECORD_PATH)
.build();
static final PropertyDescriptor AT_TIMESTAMP_TIME_FORMAT = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-time-format")
.displayName("@Timestamp Record Path Time Format")
.description("Specifies the format to use when writing Time field for @timestamp. "
+ "If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
.dependsOn(AT_TIMESTAMP_RECORD_PATH)
.build();
static final PropertyDescriptor AT_TIMESTAMP_TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
.name("put-es-record-at-timestamp-timestamp-format")
.displayName("@Timestamp Record Path Timestamp Format")
.description("Specifies the format to use when writing Timestamp field for @timestamp. "
+ "If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/25/2017 18:04:15).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SimpleDateFormatValidator())
.required(false)
.dependsOn(AT_TIMESTAMP_RECORD_PATH)
.build();
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
INDEX_OP, INDEX, TYPE, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, INDEX_OP_RECORD_PATH,
INDEX_RECORD_PATH, TYPE_RECORD_PATH, LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
AT_TIMESTAMP_DATE_FORMAT, AT_TIMESTAMP_TIME_FORMAT, AT_TIMESTAMP_TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES,
ERROR_RECORD_WRITER
));
static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
@ -170,6 +259,9 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
private ElasticSearchClientService clientService;
private RecordSetWriterFactory writerFactory;
private boolean logErrors;
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
@OnScheduled
public void onScheduled(ProcessContext context) {
@ -178,6 +270,19 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
this.recordPathCache = new RecordPathCache(16);
this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
this.dateFormat = context.getProperty(AT_TIMESTAMP_DATE_FORMAT).evaluateAttributeExpressions().getValue();
if (this.dateFormat == null) {
this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
}
this.timeFormat = context.getProperty(AT_TIMESTAMP_TIME_FORMAT).evaluateAttributeExpressions().getValue();
if (this.timeFormat == null) {
this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
}
this.timestampFormat = context.getProperty(AT_TIMESTAMP_TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
if (this.timestampFormat == null) {
this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
}
}
static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
@ -214,7 +319,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
public void onTrigger(ProcessContext context, ProcessSession session) {
FlowFile input = session.get();
if (input == null) {
return;
@ -223,24 +328,22 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
final String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(input).getValue();
final String indexOpPath = context.getProperty(INDEX_OP_RECORD_PATH).isSet()
? context.getProperty(INDEX_OP_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
: null;
final String idPath = context.getProperty(ID_RECORD_PATH).isSet()
? context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
: null;
final String indexPath = context.getProperty(INDEX_RECORD_PATH).isSet()
? context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
: null;
final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
: null;
final String indexOpPath = context.getProperty(INDEX_OP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
final String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
final String indexPath = context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
final String typePath = context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
RecordPath ioPath = indexOpPath != null ? recordPathCache.getCompiled(indexOpPath) : null;
RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
RecordPath atPath = atTimestampPath != null ? recordPathCache.getCompiled(atTimestampPath) : null;
boolean retainId = context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean();
boolean retainTimestamp = context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean();
int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
List<FlowFile> badRecords = new ArrayList<>();
@ -252,13 +355,16 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
List<Record> originals = new ArrayList<>();
while ((record = reader.nextRecord()) != null) {
final String idx = getFromRecordPath(record, iPath, index);
final String t = getFromRecordPath(record, tPath, type);
final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp));
final String id = path != null ? getFromRecordPath(record, path, null) : null;
final String idx = getFromRecordPath(record, iPath, index, false);
final String t = getFromRecordPath(record, tPath, type, false);
final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false));
final String id = getFromRecordPath(record, path, null, retainId);
final Object timestamp = getTimestampFromRecordPath(record, atPath, atTimestamp, retainTimestamp);
@SuppressWarnings("unchecked")
Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
final Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils
.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
contentMap.putIfAbsent("@timestamp", timestamp);
operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
originals.add(record);
@ -275,7 +381,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
}
}
if (operationList.size() > 0) {
if (!operationList.isEmpty()) {
BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
FlowFile bad = indexDocuments(bundle, session, input);
if (bad != null) {
@ -315,7 +421,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
List<Map<String, Object>> errors = response.getItems();
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
String output = String.format("An error was encountered while processing bulk operations. Server response below:\n\n%s", mapper.writeValueAsString(errors));
String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", mapper.writeValueAsString(errors));
if (logErrors) {
getLogger().error(output);
@ -326,26 +432,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
if (writerFactory != null) {
FlowFile errorFF = session.create(input);
try (OutputStream os = session.write(errorFF);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
try {
int added = 0;
writer.beginRecordSet();
for (int index = 0; index < response.getItems().size(); index++) {
Map<String, Object> current = response.getItems().get(index);
if (!current.isEmpty()) {
String key = current.keySet().iterator().next();
@SuppressWarnings("unchecked")
Map<String, Object> inner = (Map<String, Object>) current.get(key);
if (inner.containsKey("error")) {
writer.write(bundle.getOriginalRecords().get(index));
added++;
try (OutputStream os = session.write(errorFF);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
writer.beginRecordSet();
for (int index = 0; index < response.getItems().size(); index++) {
Map<String, Object> current = response.getItems().get(index);
if (!current.isEmpty()) {
String key = current.keySet().stream().findFirst().orElse(null);
@SuppressWarnings("unchecked")
Map<String, Object> inner = (Map<String, Object>) current.get(key);
if (inner != null && inner.containsKey("error")) {
writer.write(bundle.getOriginalRecords().get(index));
added++;
}
}
}
writer.finishRecordSet();
}
writer.finishRecordSet();
writer.close();
os.close();
errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, String.valueOf(added));
@ -362,7 +468,8 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
return null;
}
private String getFromRecordPath(final Record record, final RecordPath path, final String fallback) {
private String getFromRecordPath(final Record record, final RecordPath path, final String fallback,
final boolean retain) {
if (path == null) {
return fallback;
}
@ -377,11 +484,97 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
);
}
fieldValue.updateValue(null);
if (!retain) {
fieldValue.updateValue(null);
}
return fieldValue.getValue().toString();
} else {
return fallback;
}
}
private Object getTimestampFromRecordPath(final Record record, final RecordPath path, final String fallback,
final boolean retain) {
if (path == null) {
return coerceStringToLong("@timestamp", fallback);
}
final RecordPathResult result = path.evaluate(record);
final Optional<FieldValue> value = result.getSelectedFields().findFirst();
if (value.isPresent() && value.get().getValue() != null) {
final FieldValue fieldValue = value.get();
final DataType dataType = fieldValue.getField().getDataType();
final String fieldName = fieldValue.getField().getFieldName();
final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE
? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType)
: dataType;
final Object coercedValue = DataTypeUtils.convertType(fieldValue.getValue(), chosenDataType, fieldName);
if (coercedValue == null) {
return null;
}
final Object returnValue;
switch (chosenDataType.getFieldType()) {
case DATE:
case TIME:
case TIMESTAMP:
final String format = determineDateFormat(chosenDataType.getFieldType());
returnValue = coerceStringToLong(
fieldName,
DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(format))
);
break;
case LONG:
returnValue = DataTypeUtils.toLong(coercedValue, fieldName);
break;
case INT:
case BYTE:
case SHORT:
returnValue = DataTypeUtils.toInteger(coercedValue, fieldName);
break;
case CHAR:
case STRING:
returnValue = coerceStringToLong(fieldName, coercedValue.toString());
break;
case BIGINT:
returnValue = coercedValue;
break;
default:
throw new ProcessException(
String.format("Cannot use %s field referenced by %s as @timestamp.", chosenDataType, path.getPath())
);
}
if (!retain) {
fieldValue.updateValue(null);
}
return returnValue;
} else {
return coerceStringToLong("@timestamp", fallback);
}
}
private String determineDateFormat(final RecordFieldType recordFieldType) {
final String format;
switch (recordFieldType) {
case DATE:
format = this.dateFormat;
break;
case TIME:
format = this.timeFormat;
break;
default:
format = this.timestampFormat;
}
return format;
}
private Object coerceStringToLong(final String fieldName, final String stringValue) {
return DataTypeUtils.isLongTypeCompatible(stringValue)
? DataTypeUtils.toLong(stringValue, fieldName)
: stringValue;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.record.MockRecordParser
import org.apache.nifi.serialization.record.MockSchemaRegistry
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.util.StringUtils
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
@ -35,10 +36,29 @@ import org.junit.Assert
import org.junit.Before
import org.junit.Test
import java.sql.Date
import java.sql.Time
import java.sql.Timestamp
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.format.DateTimeFormatter
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class PutElasticsearchRecordTest {
private static final int DATE_YEAR = 2020
private static final int DATE_MONTH = 11
private static final int DATE_DAY = 27
private static final int TIME_HOUR = 12
private static final int TIME_MINUTE = 55
private static final int TIME_SECOND = 23
private static final LocalDateTime LOCAL_DATE_TIME = LocalDateTime.of(DATE_YEAR, DATE_MONTH, DATE_DAY, TIME_HOUR, TIME_MINUTE, TIME_SECOND)
private static final LocalDate LOCAL_DATE = LocalDate.of(DATE_YEAR, DATE_MONTH, DATE_DAY)
private static final LocalTime LOCAL_TIME = LocalTime.of(TIME_HOUR, TIME_MINUTE, TIME_SECOND)
MockBulkLoadClientService clientService
MockSchemaRegistry registry
RecordReaderFactory reader
@ -73,10 +93,12 @@ class PutElasticsearchRecordTest {
runner.addControllerService("reader", reader)
runner.addControllerService("clientService", clientService)
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader")
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "index")
runner.setProperty(PutElasticsearchRecord.INDEX_OP, IndexOperationRequest.Operation.Index.getValue())
runner.setProperty(PutElasticsearchRecord.INDEX, "test_index")
runner.setProperty(PutElasticsearchRecord.TYPE, "test_type")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "test_timestamp")
runner.setProperty(PutElasticsearchRecord.CLIENT_SERVICE, "clientService")
runner.enableControllerService(registry)
runner.enableControllerService(reader)
@ -86,6 +108,23 @@ class PutElasticsearchRecordTest {
}
void basicTest(int failure, int retry, int success) {
def evalClosure = { List<IndexOperationRequest> items ->
int timestampDefaultCount = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
int indexCount = items.findAll { it.index == "test_index" }.size()
int typeCount = items.findAll { it.type == "test_type" }.size()
int opCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
Assert.assertEquals(2, timestampDefaultCount)
Assert.assertEquals(2, indexCount)
Assert.assertEquals(2, typeCount)
Assert.assertEquals(2, opCount)
}
basicTest(failure, retry, success, evalClosure)
}
void basicTest(int failure, int retry, int success, Closure evalClosure) {
clientService.evalClosure = evalClosure
runner.enqueue(flowFileContents, [ "schema.name": "simple" ])
runner.run()
@ -99,6 +138,17 @@ class PutElasticsearchRecordTest {
basicTest(0, 0, 1)
}
@Test
void simpleTestCoercedDefaultTimestamp() {
def evalClosure = { List<IndexOperationRequest> items ->
int timestampDefault = items.findAll { it.fields.get("@timestamp") == 100L }.size()
Assert.assertEquals(2, timestampDefault)
}
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
basicTest(0, 0, 1, evalClosure)
}
@Test
void simpleTestWithMockReader() {
reader = new MockRecordParser()
@ -127,14 +177,19 @@ class PutElasticsearchRecordTest {
name: "RecordPathTestType",
fields: [
[ name: "id", type: "string" ],
[ name: "op", type: "string" ],
[ name: "index", type: "string" ],
[ name: "type", type: "string" ],
[ name: "msg", type: ["null", "string"] ]
[ name: "msg", type: ["null", "string"] ],
[ name: "ts", type: [ type: "long", logicalType: "timestamp-millis" ] ],
[ name: "date", type: [ type: "int", logicalType: "date" ] ],
[ name: "time", type: [ type: "int", logicalType: "time-millis" ] ],
[ name: "code", type: "long" ]
]
]))
def flowFileContents = prettyPrint(toJson([
[ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello", ts: Timestamp.valueOf(LOCAL_DATE_TIME).toInstant().toEpochMilli() ],
[ id: "rec-2", op: "index", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-3", op: "index", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-4", op: "index", index: "bulk_b", type: "message", msg: "Hello" ],
@ -143,13 +198,19 @@ class PutElasticsearchRecordTest {
]))
def evalClosure = { List<IndexOperationRequest> items ->
def a = items.findAll { it.index == "bulk_a" }.size()
def b = items.findAll { it.index == "bulk_b" }.size()
int a = items.findAll { it.index == "bulk_a" }.size()
int b = items.findAll { it.index == "bulk_b" }.size()
int index = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
int create = items.findAll { it.operation == IndexOperationRequest.Operation.Create }.size()
int msg = items.findAll { ("Hello" == it.fields.get("msg")) }.size()
int empties = items.findAll { ("" == it.fields.get("msg")) }.size()
int nulls = items.findAll { (null == it.fields.get("msg")) }.size()
int timestamp = items.findAll { it.fields.get("@timestamp") ==
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat()))
}.size()
int timestampDefault = items.findAll { it.fields.get("@timestamp") == "test_timestamp" }.size()
int ts = items.findAll { it.fields.get("ts") != null }.size()
int id = items.findAll { it.fields.get("id") != null }.size()
items.each {
Assert.assertNotNull(it.id)
Assert.assertTrue(it.id.startsWith("rec-"))
@ -162,6 +223,10 @@ class PutElasticsearchRecordTest {
Assert.assertEquals(4, msg)
Assert.assertEquals(1, empties)
Assert.assertEquals(1, nulls)
Assert.assertEquals(1, timestamp)
Assert.assertEquals(5, timestampDefault)
Assert.assertEquals(0, ts)
Assert.assertEquals(0, id)
}
clientService.evalClosure = evalClosure
@ -173,6 +238,7 @@ class PutElasticsearchRecordTest {
runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id")
runner.setProperty(PutElasticsearchRecord.INDEX_RECORD_PATH, "/index")
runner.setProperty(PutElasticsearchRecord.TYPE_RECORD_PATH, "/type")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/ts")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
@ -185,12 +251,12 @@ class PutElasticsearchRecordTest {
runner.clearTransferState()
flowFileContents = prettyPrint(toJson([
[ id: "rec-1", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-2", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-3", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-4", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-5", op: "update", index: null, type: null, msg: "Hello" ],
[ id: "rec-6", op: null, index: "bulk_b", type: "message", msg: "Hello" ]
[ id: "rec-1", op: null, index: null, type: null, msg: "Hello", date: Date.valueOf(LOCAL_DATE).getTime() ],
[ id: "rec-2", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-3", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-4", op: null, index: null, type: null, msg: "Hello" ],
[ id: "rec-5", op: "update", index: null, type: null, msg: "Hello" ],
[ id: "rec-6", op: null, index: "bulk_b", type: "message", msg: "Hello" ]
]))
evalClosure = { List<IndexOperationRequest> items ->
@ -200,17 +266,32 @@ class PutElasticsearchRecordTest {
def bulkIndexCount = items.findAll { it.index.startsWith("bulk_") }.size()
def indexOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
def updateOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Update }.size()
def timestampCount = items.findAll { it.fields.get("@timestamp") ==
LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy"))
}.size()
int dateCount = items.findAll { it.fields.get("date") != null }.size()
def idCount = items.findAll { it.fields.get("id") != null }.size()
def defaultCoercedTimestampCount = items.findAll { it.fields.get("@timestamp") == 100L }.size()
Assert.assertEquals(5, testTypeCount)
Assert.assertEquals(1, messageTypeCount)
Assert.assertEquals(5, testIndexCount)
Assert.assertEquals(1, bulkIndexCount)
Assert.assertEquals(5, indexOperationCount)
Assert.assertEquals(1, updateOperationCount)
Assert.assertEquals(1, timestampCount)
Assert.assertEquals(5, defaultCoercedTimestampCount)
Assert.assertEquals(1, dateCount)
Assert.assertEquals(6, idCount)
}
clientService.evalClosure = evalClosure
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "\${operation}")
runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP, "100")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/date")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_DATE_FORMAT, "dd/MM/yyyy")
runner.setProperty(PutElasticsearchRecord.RETAIN_AT_TIMESTAMP_FIELD, "true")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest",
"operation": "index"
@ -228,7 +309,7 @@ class PutElasticsearchRecordTest {
[ id: "rec-3", msg: "Hello" ],
[ id: "rec-4", msg: "Hello" ],
[ id: "rec-5", msg: "Hello" ],
[ id: "rec-6", type: "message", msg: "Hello" ]
[ id: "rec-6", type: "message", msg: "Hello", time: Time.valueOf(LOCAL_TIME).getTime() ]
]))
evalClosure = { List<IndexOperationRequest> items ->
@ -236,15 +317,21 @@ class PutElasticsearchRecordTest {
def messageTypeCount = items.findAll { it.type == "message" }.size()
def nullIdCount = items.findAll { it.id == null }.size()
def recIdCount = items.findAll { StringUtils.startsWith(it.id, "rec-") }.size()
def timestampCount = items.findAll { it.fields.get("@timestamp") ==
LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat()))
}.size()
Assert.assertEquals("null type", 5, nullTypeCount)
Assert.assertEquals("message type", 1, messageTypeCount)
Assert.assertEquals("null id", 2, nullIdCount)
Assert.assertEquals("rec- id", 4, recIdCount)
Assert.assertEquals("@timestamp", 1, timestampCount)
}
clientService.evalClosure = evalClosure
runner.setProperty(PutElasticsearchRecord.INDEX_OP, "index")
runner.removeProperty(PutElasticsearchRecord.AT_TIMESTAMP)
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/time")
runner.removeProperty(PutElasticsearchRecord.TYPE)
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
@ -262,7 +349,7 @@ class PutElasticsearchRecordTest {
[ id: "rec-3", op: "update", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-4", op: "upsert", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-5", op: "create", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-6", op: "delete", index: "bulk_b", type: "message", msg: "Hello" ]
[ id: "rec-6", op: "delete", index: "bulk_b", type: "message", msg: "Hello", code: 101L ]
]))
clientService.evalClosure = { List<IndexOperationRequest> items ->
@ -271,13 +358,18 @@ class PutElasticsearchRecordTest {
int update = items.findAll { it.operation == IndexOperationRequest.Operation.Update }.size()
int upsert = items.findAll { it.operation == IndexOperationRequest.Operation.Upsert }.size()
int delete = items.findAll { it.operation == IndexOperationRequest.Operation.Delete }.size()
def timestampCount = items.findAll { it.fields.get("@timestamp") == 101L }.size()
def noTimestampCount = items.findAll { it.fields.get("@timestamp") == null }.size()
Assert.assertEquals(1, index)
Assert.assertEquals(2, create)
Assert.assertEquals(1, update)
Assert.assertEquals(1, upsert)
Assert.assertEquals(1, delete)
Assert.assertEquals(1, timestampCount)
Assert.assertEquals(5, noTimestampCount)
}
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/code")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
@ -285,6 +377,26 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.clearTransferState()
flowFileContents = prettyPrint(toJson([
[ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello" ]
]))
clientService.evalClosure = { List<IndexOperationRequest> items ->
def timestampCount = items.findAll { it.fields.get("@timestamp") == "Hello" }.size()
Assert.assertEquals(1, timestampCount)
}
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "/msg")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
}
@Test
@ -354,4 +466,4 @@ class PutElasticsearchRecordTest {
def errorFF = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
assert errorFF.getAttribute(PutElasticsearchRecord.ATTR_RECORD_COUNT) == "1"
}
}
}