NIFI-8133 Add ability to supress null/empty values in ElasticSearchClientService for PutElasticsearchRecord output

NIFI-8133 pass null/empty field handling from PutElasticsearchRecord to ElasticSearchClientImpl

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4755
This commit is contained in:
Chris Sampson 2021-01-12 16:25:37 +00:00 committed by Matthew Burgess
parent bf960cae2e
commit e0a1cb8a01
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
6 changed files with 87 additions and 39 deletions

View File

@ -18,6 +18,7 @@ package org.apache.nifi.elasticsearch;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ControllerService;
@ -99,6 +100,21 @@ public interface ElasticSearchClientService extends ControllerService {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress",
"Fields that are missing (present in the schema but not in the record), or that have a value of null/empty, will not be written out");
AllowableValue NEVER_SUPPRESS = new AllowableValue("never-suppress", "Never Suppress",
"Fields that are missing (present in the schema but not in the record), or that have a value of null/empty, will be written out as a null/empty value");
PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder()
.name("el-cs-suppress-nulls")
.displayName("Suppress Null/Empty Values")
.description("Specifies how the writer should handle null and empty fields (including objects and arrays)")
.allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS)
.defaultValue(ALWAYS_SUPPRESS.getValue())
.required(true)
.build();
/**
* Index a document.
*

View File

@ -196,7 +196,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M5</version>
<version>3.0.0-M3</version>
<configuration>
<systemPropertyVariables>
<type_name>${type.name}</type_name>

View File

@ -17,6 +17,7 @@
package org.apache.nifi.elasticsearch;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
@ -32,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
@ -57,7 +59,7 @@ import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
private final ObjectMapper mapper = new ObjectMapper();
private ObjectMapper mapper;
private static final List<PropertyDescriptor> properties;
@ -76,6 +78,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
props.add(ElasticSearchClientService.RETRY_TIMEOUT);
props.add(ElasticSearchClientService.CHARSET);
props.add(ElasticSearchClientService.SUPPRESS_NULLS);
properties = Collections.unmodifiableList(props);
}
@ -90,6 +93,13 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
try {
setupClient(context);
responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
// re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic
mapper = new ObjectMapper();
if (ALWAYS_SUPPRESS.getValue().equals(context.getProperty(SUPPRESS_NULLS).getValue())) {
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
}
} catch (Exception ex) {
getLogger().error("Could not initialize ElasticSearch client.", ex);
throw new InitializationException(ex);

View File

@ -25,6 +25,7 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.security.util.KeystoreType
import org.apache.nifi.ssl.StandardSSLContextService
import org.apache.nifi.util.StringUtils
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.After
@ -41,7 +42,7 @@ class ElasticSearch5ClientService_IT {
private ElasticSearchClientServiceImpl service
static String INDEX = "messages"
static String TYPE = System.getProperty("type_name")
static String TYPE = StringUtils.isNotBlank(System.getProperty("type_name")) ? System.getProperty("type_name") : null;
@Before
void before() throws Exception {
@ -52,6 +53,7 @@ class ElasticSearch5ClientService_IT {
runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000")
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000")
runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000")
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue())
try {
runner.enableControllerService(service)
} catch (Exception ex) {
@ -176,6 +178,50 @@ class ElasticSearch5ClientService_IT {
runner.assertValid()
}
@Test
void testNullSuppression() {
Map<String, Object> doc = new HashMap<String, Object>(){{
put("msg", "test")
put("is_null", null)
put("is_empty", "")
put("is_blank", " ")
put("empty_nested", Collections.emptyMap())
put("empty_array", Collections.emptyList())
}}
// index with nulls
suppressNulls(false)
IndexOperationResponse response = service.bulk([new IndexOperationRequest("nulls", TYPE, "1", doc, IndexOperationRequest.Operation.Index)])
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
Thread.sleep(2000)
Map<String, Object> result = service.get("nulls", TYPE, "1")
Assert.assertEquals(doc, result)
// suppress nulls
suppressNulls(true)
response = service.bulk([new IndexOperationRequest("nulls", TYPE, "2", doc, IndexOperationRequest.Operation.Index)])
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
Thread.sleep(2000)
result = service.get("nulls", TYPE, "2")
Assert.assertTrue("Non-nulls (present): " + result.toString(), result.keySet().containsAll(["msg", "is_blank"]))
Assert.assertFalse("is_null (should be omitted): " + result.toString(), result.keySet().contains("is_null"))
Assert.assertFalse("is_empty (should be omitted): " + result.toString(), result.keySet().contains("is_empty"))
Assert.assertFalse("empty_nested (should be omitted): " + result.toString(), result.keySet().contains("empty_nested"))
Assert.assertFalse("empty_array (should be omitted): " + result.toString(), result.keySet().contains("empty_array"))
}
private void suppressNulls(final boolean suppressNulls) {
runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service")
runner.disableControllerService(service)
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, suppressNulls ? ElasticSearchClientService.ALWAYS_SUPPRESS.getValue() : ElasticSearchClientService.NEVER_SUPPRESS.getValue())
runner.enableControllerService(service)
runner.assertValid()
}
@Test
void testBulkAddTwoIndexes() throws Exception {
List<IndexOperationRequest> payload = new ArrayList<>()
@ -258,10 +304,9 @@ class ElasticSearch5ClientService_IT {
@Test
void testGetBulkResponsesWithErrors() {
def ops = [
new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Index),
new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Create),
new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Create),
new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "Hi", intField: "notaninteger"], IndexOperationRequest.Operation.Index)
new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: 1], IndexOperationRequest.Operation.Index), // OK
new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "two", intField: 1], IndexOperationRequest.Operation.Create), // already exists
new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: "notaninteger"], IndexOperationRequest.Operation.Index) // can't parse int field
]
def response = service.bulk(ops)
assert response.hasErrors()

View File

@ -50,14 +50,12 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -232,8 +230,6 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
removeEmpty(contentMap);
operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
originals.add(record);
@ -336,31 +332,6 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
}
}
private void removeEmpty(Map<String, Object> input) {
Map<String, Object> copy = new HashMap<>(input);
for (Map.Entry<String, Object> entry : input.entrySet()) {
if (entry.getValue() == null) {
copy.remove(entry.getKey());
} else {
if (StringUtils.isBlank(entry.getValue().toString())) {
copy.remove(entry.getKey());
} else if (entry.getValue() instanceof Map) {
removeEmpty((Map<String, Object>) entry.getValue());
} else if (entry.getValue() instanceof List) {
for (Object value : (List)entry.getValue()) {
if (value instanceof Map) {
removeEmpty((Map<String, Object>) value);
}
}
}
}
}
input.clear();
input.putAll(copy);
}
private String getFromRecordPath(Record record, RecordPath path, final String fallback) {
if (path == null) {
return fallback;

View File

@ -129,7 +129,7 @@ class PutElasticsearchRecordTest {
[ name: "id", type: "string" ],
[ name: "index", type: "string" ],
[ name: "type", type: "string" ],
[ name: "msg", type: "string" ]
[ name: "msg", type: ["null", "string"] ]
]
]))
@ -138,8 +138,8 @@ class PutElasticsearchRecordTest {
[ id: "rec-2", op: "index", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-3", op: "index", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-4", op: "index", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-5", op: "index", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-6", op: "create", index: "bulk_b", type: "message", msg: "Hello" ]
[ id: "rec-5", op: "index", index: "bulk_a", type: "message", msg: "" ],
[ id: "rec-6", op: "create", index: "bulk_b", type: "message", msg: null ]
]))
def evalClosure = { List<IndexOperationRequest> items ->
@ -147,6 +147,9 @@ class PutElasticsearchRecordTest {
def b = items.findAll { it.index == "bulk_b" }.size()
int index = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
int create = items.findAll { it.operation == IndexOperationRequest.Operation.Create }.size()
int msg = items.findAll { ("Hello" == it.fields.get("msg")) }.size()
int empties = items.findAll { ("" == it.fields.get("msg")) }.size()
int nulls = items.findAll { (null == it.fields.get("msg")) }.size()
items.each {
Assert.assertNotNull(it.id)
Assert.assertTrue(it.id.startsWith("rec-"))
@ -156,6 +159,9 @@ class PutElasticsearchRecordTest {
Assert.assertEquals(3, b)
Assert.assertEquals(5, index)
Assert.assertEquals(1, create)
Assert.assertEquals(4, msg)
Assert.assertEquals(1, empties)
Assert.assertEquals(1, nulls)
}
clientService.evalClosure = evalClosure