NIFI-10687 set Elasticsearch document _id to null if ID attribute evaluated to blank String for PutElasticsearchRecord or PutElasticsearchJson; use @timestamp default value if @timestamp record path evaluates to blank String in PutElasticsearchRecord

This closes #6575

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Chris Sampson 2022-10-24 21:28:01 +01:00 committed by Mike Thomsen
parent 2d5a8b8b0d
commit db49a861b3
4 changed files with 63 additions and 5 deletions

View File

@ -167,7 +167,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
final String id = StringUtils.isNotBlank(idAttribute) ? input.getAttribute(idAttribute) : null;
final String id = StringUtils.isNotBlank(idAttribute) && StringUtils.isNotBlank(input.getAttribute(idAttribute)) ? input.getAttribute(idAttribute) : null;
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue();

View File

@ -333,10 +333,10 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
final RecordPath ioPath = indexOpPath != null ? recordPathCache.getCompiled(indexOpPath) : null;
final RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
final RecordPath path = StringUtils.isNotBlank(idPath) ? recordPathCache.getCompiled(idPath) : null;
final RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
final RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
final RecordPath atPath = atTimestampPath != null ? recordPathCache.getCompiled(atTimestampPath) : null;
final RecordPath atPath = StringUtils.isNotBlank(atTimestampPath) ? recordPathCache.getCompiled(atTimestampPath) : null;
final boolean retainId = context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean();
final boolean retainTimestamp = context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean();
@ -562,7 +562,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
fieldValue.updateValue(null);
}
return fieldValue.getValue().toString();
return fieldValue.toString();
} else {
return fallback;
}

View File

@ -161,7 +161,14 @@ class PutElasticsearchJsonTest {
clientService.evalParametersClosure = evalParametersClosure
basicTest(0, 0, 1, [slices: "auto"])
def evalClosure = { List<IndexOperationRequest> items ->
int nullIdCount = items.findAll { it.id == null }.size()
assertEquals(1, nullIdCount)
}
clientService.evalClosure = evalClosure
basicTest(0, 0, 1, [slices: "auto", "doc_id": ""])
}
@Test

View File

@ -478,6 +478,57 @@ class PutElasticsearchRecordTest {
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.clearTransferState()
flowFileContents = prettyPrint(toJson([
[ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello" ]
]))
clientService.evalClosure = { List<IndexOperationRequest> items ->
def nullIdCount = items.findAll { it.id == null }.size()
def noTimestampCount = items.findAll { it.fields.containsKey("@timestamp") }.size()
assertEquals(1, nullIdCount)
assertEquals(1, noTimestampCount)
}
runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "\${id_not_exist}")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "\${not_exist}")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
runner.clearTransferState()
flowFileContents = prettyPrint(toJson([
[ id: "rec-1", op: "index", index: "bulk_a", type: "message", msg: "Hello", empty: "" ]
]))
clientService.evalClosure = { List<IndexOperationRequest> items ->
def nullIdCount = items.findAll { it.id == null }.size()
def noTimestampCount = items.findAll { it.fields.containsKey("@timestamp") }.size()
assertEquals(1, nullIdCount)
assertEquals(1, noTimestampCount)
}
runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "\${will_be_empty}")
runner.setProperty(PutElasticsearchRecord.AT_TIMESTAMP_RECORD_PATH, "\${will_be_empty}")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest",
"will_be_empty": "/empty"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
}
@Test