NIFI-12480 Updated MapRecord's toString() method to use the SerializedForm of the record when available and fixed bugs around ensuring that the serialized form is properly set

This closes #8132

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-12-05 16:33:18 -05:00 committed by exceptionfactory
parent facb43ea89
commit ba599d29c2
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
8 changed files with 161 additions and 70 deletions

View File

@ -25,6 +25,7 @@ import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@ -43,23 +44,28 @@ public class EscapeJson extends RecordPathSegment {
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
Object value = fv.getValue();
if (value == null) {
return new StandardFieldValue(null, fv.getField(), fv.getParent().orElse(null));
} else {
if (value instanceof Record) {
value = DataTypeUtils.convertRecordFieldtoObject(value, RecordFieldType.RECORD.getDataType());
}
try {
return new StandardFieldValue(objectMapper.writeValueAsString(value), fv.getField(), fv.getParent().orElse(null));
} catch (JsonProcessingException e) {
throw new RecordPathException("Unable to serialise Record Path value as JSON String", e);
}
return fieldValues
.filter(fieldValue -> fieldValue.getValue() != null)
.map(fieldValue -> {
Object value = fieldValue.getValue();
if (value == null) {
return new StandardFieldValue(null, fieldValue.getField(), fieldValue.getParent().orElse(null));
} else {
if (value instanceof Record) {
value = DataTypeUtils.convertRecordFieldtoObject(value, RecordFieldType.RECORD.getDataType());
}
});
try {
final RecordField originalField = fieldValue.getField();
final RecordField escapedField = new RecordField(originalField.getFieldName(), RecordFieldType.STRING.getDataType(),
null, originalField.getAliases(), originalField.isNullable());
return new StandardFieldValue(objectMapper.writeValueAsString(value), escapedField, fieldValue.getParent().orElse(null));
} catch (JsonProcessingException e) {
throw new RecordPathException("Unable to serialise Record Path value as JSON String", e);
}
}
});
}
}

View File

@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
@ -52,6 +53,7 @@ public class MapRecord implements Record {
private final boolean checkTypes;
private final boolean dropUnknownFields;
private Set<RecordField> inactiveFields = null;
private Map<String, RecordField> updatedFields = null;
public MapRecord(final RecordSchema schema, final Map<String, Object> values) {
this(schema, values, false, false);
@ -298,10 +300,9 @@ public class MapRecord implements Record {
if (obj == null) {
return false;
}
if (!(obj instanceof MapRecord)) {
if (!(obj instanceof final MapRecord other)) {
return false;
}
final MapRecord other = (MapRecord) obj;
return schema.equals(other.schema) && valuesEqual(values, other.values);
}
@ -335,14 +336,68 @@ public class MapRecord implements Record {
@Override
public String toString() {
return "MapRecord[" + values + "]";
final Optional<SerializedForm> serializedForm = getSerializedForm();
if (serializedForm.isEmpty()) {
return "MapRecord[" + values + "]";
}
final Object serialized = serializedForm.get().getSerialized();
return serialized == null ? "MapRecord[" + values + "]" : serialized.toString();
}
@Override
public Optional<SerializedForm> getSerializedForm() {
if (serializedForm.isEmpty()) {
return Optional.empty();
}
if (isSerializedFormReset()) {
return Optional.empty();
}
return serializedForm;
}
private boolean isSerializedFormReset() {
if (serializedForm.isEmpty()) {
return true;
}
for (final Object value : values.values()) {
if (isSerializedFormReset(value)) {
return true;
}
}
return false;
}
private boolean isSerializedFormReset(final Object value) {
if (value == null) {
return true;
}
if (value instanceof final MapRecord childRecord) {
if (childRecord.isSerializedFormReset()) {
return true;
}
} else if (value instanceof final Collection<?> collection) {
for (final Object collectionValue : collection) {
if (isSerializedFormReset(collectionValue)) {
return true;
}
}
} else if (value instanceof final Object[] array) {
for (final Object arrayValue : array) {
if (isSerializedFormReset(arrayValue)) {
return true;
}
}
}
return false;
}
@Override
public Map<String, Object> toMap() {
return toMap(false);
@ -365,8 +420,7 @@ public class MapRecord implements Record {
maps[index] = ((MapRecord) records[index]).toMap(true);
}
valueToAdd = maps;
} else if (value instanceof List) {
List<?> valueList = (List<?>) value;
} else if (value instanceof final List<?> valueList) {
if (!valueList.isEmpty() && valueList.get(0) instanceof MapRecord) {
List<Map<String, Object>> newRecords = new ArrayList<>();
for (Object o : valueList) {
@ -395,7 +449,18 @@ public class MapRecord implements Record {
public void setValue(final RecordField field, final Object value) {
final Optional<RecordField> existingField = setValueAndGetField(field.getFieldName(), value);
if (!existingField.isPresent()) {
// Keep track of any fields whose definition has been added or changed so that it can be taken into account when
// calling #incorporateInactiveFields
if (existingField.isPresent()) {
final RecordField existingRecordField = existingField.get();
final RecordField merged = DataTypeUtils.merge(existingRecordField, field);
if (!Objects.equals(existingRecordField, merged)) {
if (updatedFields == null) {
updatedFields = new LinkedHashMap<>();
}
updatedFields.put(field.getFieldName(), merged);
}
} else {
if (inactiveFields == null) {
inactiveFields = new LinkedHashSet<>();
}
@ -442,8 +507,7 @@ public class MapRecord implements Record {
final Object fieldValue = getValue(schemaField);
if (schemaField.getDataType().getFieldType() == RecordFieldType.CHOICE) {
schemaFields.add(schemaField);
} else if (fieldValue instanceof Record) {
final Record childRecord = (Record) fieldValue;
} else if (fieldValue instanceof final Record childRecord) {
schemaFields.add(new RecordField(schemaField.getFieldName(), RecordFieldType.RECORD.getRecordDataType(childRecord.getSchema()), schemaField.isNullable()));
} else {
schemaFields.add(schemaField);
@ -457,7 +521,7 @@ public class MapRecord implements Record {
public void setValue(final String fieldName, final Object value) {
final Optional<RecordField> existingField = setValueAndGetField(fieldName, value);
if (!existingField.isPresent()) {
if (existingField.isEmpty()) {
if (inactiveFields == null) {
inactiveFields = new LinkedHashSet<>();
}
@ -468,9 +532,10 @@ public class MapRecord implements Record {
}
}
private Optional<RecordField> setValueAndGetField(final String fieldName, final Object value) {
final Optional<RecordField> field = getSchema().getField(fieldName);
if (!field.isPresent()) {
if (field.isEmpty()) {
if (dropUnknownFields) {
return field;
}
@ -496,7 +561,7 @@ public class MapRecord implements Record {
@Override
public void setArrayValue(final String fieldName, final int arrayIndex, final Object value) {
final Optional<RecordField> field = getSchema().getField(fieldName);
if (!field.isPresent()) {
if (field.isEmpty()) {
return;
}
@ -535,7 +600,7 @@ public class MapRecord implements Record {
@SuppressWarnings("unchecked")
public void setMapValue(final String fieldName, final String mapKey, final Object value) {
final Optional<RecordField> field = getSchema().getField(fieldName);
if (!field.isPresent()) {
if (field.isEmpty()) {
return;
}
@ -601,16 +666,24 @@ public class MapRecord implements Record {
}
private RecordField getUpdatedRecordField(final RecordField field) {
final DataType dataType = field.getDataType();
final String fieldName = field.getFieldName();
final RecordField specField;
if (updatedFields == null) {
specField = field;
} else {
specField = updatedFields.getOrDefault(fieldName, field);
}
final DataType dataType = specField.getDataType();
final RecordFieldType fieldType = dataType.getFieldType();
if (isSimpleType(fieldType)) {
return field;
return specField;
}
final Object value = getValue(field);
final Object value = getValue(specField);
if (value == null) {
return field;
return specField;
}
if (fieldType == RecordFieldType.RECORD && value instanceof Record) {
@ -622,8 +695,7 @@ public class MapRecord implements Record {
final RecordSchema combinedChildSchema = DataTypeUtils.merge(definedChildSchema, actualChildSchema);
final DataType combinedDataType = RecordFieldType.RECORD.getRecordDataType(combinedChildSchema);
final RecordField updatedField = new RecordField(field.getFieldName(), combinedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable());
return updatedField;
return new RecordField(specField.getFieldName(), combinedDataType, specField.getDefaultValue(), specField.getAliases(), specField.isNullable());
}
if (fieldType == RecordFieldType.ARRAY && value instanceof Object[]) {
@ -646,11 +718,10 @@ public class MapRecord implements Record {
final DataType mergedRecordType = RecordFieldType.RECORD.getRecordDataType(mergedSchema);
final DataType mergedDataType = RecordFieldType.ARRAY.getArrayDataType(mergedRecordType);
final RecordField updatedField = new RecordField(field.getFieldName(), mergedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable());
return updatedField;
return new RecordField(specField.getFieldName(), mergedDataType, specField.getDefaultValue(), specField.getAliases(), specField.isNullable());
}
return field;
return specField;
}
if (fieldType == RecordFieldType.CHOICE) {
@ -659,7 +730,7 @@ public class MapRecord implements Record {
final DataType chosenDataType = DataTypeUtils.chooseDataType(value, choiceDataType);
if (chosenDataType.getFieldType() != RecordFieldType.RECORD || !(value instanceof Record)) {
return field;
return specField;
}
final RecordDataType recordDataType = (RecordDataType) chosenDataType;
@ -681,22 +752,18 @@ public class MapRecord implements Record {
}
final DataType mergedDataType = RecordFieldType.CHOICE.getChoiceDataType(updatedPossibleTypes);
return new RecordField(field.getFieldName(), mergedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable());
return new RecordField(specField.getFieldName(), mergedDataType, specField.getDefaultValue(), specField.getAliases(), specField.isNullable());
}
return field;
return specField;
}
private boolean isSimpleType(final RecordFieldType fieldType) {
switch (fieldType) {
case ARRAY:
case RECORD:
case MAP:
case CHOICE:
return false;
}
return switch (fieldType) {
case ARRAY, RECORD, MAP, CHOICE -> false;
default -> true;
};
return true;
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.nifi.serialization.record;
import java.util.Objects;
import java.util.function.Supplier;
public interface SerializedForm {
/**
@ -30,7 +31,7 @@ public interface SerializedForm {
*/
String getMimeType();
public static SerializedForm of(final java.util.function.Supplier<Object> serializedSupplier, final String mimeType) {
static SerializedForm of(final Supplier<?> serializedSupplier, final String mimeType) {
Objects.requireNonNull(serializedSupplier);
Objects.requireNonNull(mimeType);
@ -68,17 +69,16 @@ public interface SerializedForm {
return false;
}
if (!(obj instanceof SerializedForm)) {
if (!(obj instanceof final SerializedForm other)) {
return false;
}
final SerializedForm other = (SerializedForm) obj;
return other.getMimeType().equals(mimeType) && Objects.deepEquals(other.getSerialized(), getSerialized());
}
};
}
public static SerializedForm of(final Object serialized, final String mimeType) {
static SerializedForm of(final Object serialized, final String mimeType) {
Objects.requireNonNull(serialized);
Objects.requireNonNull(mimeType);
@ -108,11 +108,10 @@ public interface SerializedForm {
return false;
}
if (!(obj instanceof SerializedForm)) {
if (!(obj instanceof final SerializedForm other)) {
return false;
}
final SerializedForm other = (SerializedForm) obj;
return other.getMimeType().equals(mimeType) && Objects.deepEquals(other.getSerialized(), getSerialized());
}
};

View File

@ -35,6 +35,7 @@ import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SerializedForm;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
@ -399,7 +400,8 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
childValues.put(childFieldName, childValue);
}
return new MapRecord(childSchema, childValues);
final SerializedForm serializedForm = SerializedForm.of(fieldNode::toString, "application/json");
return new MapRecord(childSchema, childValues, serializedForm);
}
protected JsonNode getNextJsonNode() throws IOException, MalformedRecordException {

View File

@ -61,6 +61,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
private String mimeType = "application/json";
private final boolean prettyPrint;
private static final ObjectMapper objectMapper = new ObjectMapper();
@ -94,6 +95,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
factory.setCodec(objectMapper);
this.generator = factory.createGenerator(out);
this.prettyPrint = prettyPrint;
if (prettyPrint) {
generator.useDefaultPrettyPrinter();
} else if (OutputGrouping.OUTPUT_ONELINE.equals(outputGrouping)) {
@ -173,9 +175,12 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
final SerializedForm form = serializedForm.get();
if (form.getMimeType().equals(getMimeType()) && record.getSchema().equals(writeSchema)) {
final Object serialized = form.getSerialized();
if (serialized instanceof String) {
generator.writeRawValue((String) serialized);
return;
if (serialized instanceof final String serializedString) {
final boolean serializedPretty = serializedString.contains("\n");
if (serializedPretty == this.prettyPrint) {
generator.writeRawValue((String) serialized);
return;
}
}
}
}

View File

@ -450,14 +450,17 @@ class TestJsonTreeRowRecordReader {
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final String expectedMap = "{id=1, name=John Doe, address=123 My Street, city=My City, state=MS, zipCode=11111, country=USA, account=MapRecord[{id=42, balance=4750.89}]}";
final String expectedRecord = String.format("MapRecord[%s]", expectedMap);
final String expectedRecordToString = """
{"id":1,"name":"John Doe","address":"123 My Street","city":"My City","state":"MS","zipCode":"11111","country":"USA","account":{"id":42,"balance":4750.89}}""";
final String expectedMap = "{id=1, name=John Doe, address=123 My Street, city=My City, state=MS, zipCode=11111, country=USA, account={\"id\":42,\"balance\":4750.89}}";
try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(expectedRecord, rawRecord.toString());
assertEquals(expectedRecordToString, rawRecord.toString());
final Map<String, Object> map = rawRecord.toMap();
assertEquals(expectedMap, map.toString());

View File

@ -133,14 +133,23 @@ class TestWriteJsonResult {
final Map<String, Object> values1 = new HashMap<>();
values1.put("name", "John Doe");
values1.put("age", 42);
final String serialized1 = "{ \"name\": \"John Doe\", \"age\": 42 }";
final String serialized1 = """
{
"name": "John Doe",
"age": 42
}""";
final SerializedForm serializedForm1 = SerializedForm.of(serialized1, "application/json");
final Record record1 = new MapRecord(schema, values1, serializedForm1);
final Map<String, Object> values2 = new HashMap<>();
values2.put("name", "Jane Doe");
values2.put("age", 43);
final String serialized2 = "{ \"name\": \"Jane Doe\", \"age\": 43 }";
final String serialized2 = """
{
"name": "Jane Doe",
"age": 43
}""";
final SerializedForm serializedForm2 = SerializedForm.of(serialized2, "application/json");
final Record record2 = new MapRecord(schema, values1, serializedForm2);
@ -154,11 +163,9 @@ class TestWriteJsonResult {
writer.write(rs);
}
final byte[] data = baos.toByteArray();
final String expected = "[ " + serialized1 + ", " + serialized2 + " ]";
final String output = new String(data, StandardCharsets.UTF_8);
final String output = baos.toString(StandardCharsets.UTF_8);
assertEquals(expected, output);
}

View File

@ -276,14 +276,16 @@ class TestYamlTreeRowRecordReader {
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final String expectedMap = "{id=1, name=John Doe, address=123 My Street, city=My City, state=MS, zipCode=11111, country=USA, account=MapRecord[{id=42, balance=4750.89}]}";
final String expectedRecord = String.format("MapRecord[%s]", expectedMap);
final String expectedRecordToString = """
{"id":1,"name":"John Doe","address":"123 My Street","city":"My City","state":"MS","zipCode":"11111","country":"USA","account":{"id":42,"balance":4750.89}}""";
final String expectedMap = "{id=1, name=John Doe, address=123 My Street, city=My City, state=MS, zipCode=11111, country=USA, account={\"id\":42,\"balance\":4750.89}}";
try (final InputStream in = Files.newInputStream(Paths.get("src/test/resources/yaml/single-element-nested.yaml"));
final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(expectedRecord, rawRecord.toString());
assertEquals(expectedRecordToString, rawRecord.toString());
final Map<String, Object> map = rawRecord.toMap();
assertEquals(expectedMap, map.toString());