NIFI-4767 - Fixed issues with RecordPath using the wrong field name for arrays and maps. Also addressed issue where Avro Reader was returning a Record object when it should return a Map

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2398.
This commit is contained in:
Mark Payne 2018-01-11 15:04:14 -05:00 committed by Pierre Villard
parent 5f7bd81af9
commit a36afe0bbe
6 changed files with 75 additions and 34 deletions

View File

@ -19,7 +19,6 @@ package org.apache.nifi.record.path.paths;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.nifi.record.path.ArrayIndexFieldValue;
@ -56,24 +55,19 @@ public class MultiArrayIndexPath extends RecordPathSegment {
return indices.stream()
.filter(range -> values.length > Math.abs(range.getMin()))
.flatMap(range -> {
final List<Object> valuesWithinRange = new ArrayList<>();
final List<Integer> indexes = new ArrayList<Integer>();
final int min = range.getMin() < 0 ? values.length + range.getMin() : range.getMin();
final int max = range.getMax() < 0 ? values.length + range.getMax() : range.getMax();
final List<FieldValue> indexFieldValues = new ArrayList<>(Math.max(0, max - min));
for (int i = min; i <= max; i++) {
if (values.length > i) {
valuesWithinRange.add(values[i]);
indexes.add(i);
final RecordField elementField = new RecordField(arrayField.getFieldName(), elementDataType);
final FieldValue arrayIndexFieldValue = new ArrayIndexFieldValue(values[i], elementField, fieldValue, i);
indexFieldValues.add(arrayIndexFieldValue);
}
}
return IntStream.range(0, valuesWithinRange.size())
.mapToObj(index -> {
final RecordField elementField = new RecordField(arrayField.getFieldName(), elementDataType);
return new ArrayIndexFieldValue(valuesWithinRange.get(index), elementField, fieldValue, indexes.get(index));
});
return indexFieldValues.stream();
});
});

View File

@ -49,7 +49,7 @@ public class MultiMapKeyPath extends RecordPathSegment {
final Map<String, ?> map = (Map<String, ?>) fieldValue.getValue();
return mapKeys.stream().map(key -> {
final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType();
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + key + "']", valueType);
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType);
return new MapEntryFieldValue(map.get(key), elementField, fieldValue, key);
});
});

View File

@ -45,7 +45,7 @@ public class SingularMapKeyPath extends RecordPathSegment {
.filter(Filters.fieldTypeFilter(RecordFieldType.MAP))
.map(fieldValue -> {
final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType();
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + mapKey + "']", valueType);
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType);
return new MapEntryFieldValue(getMapValue(fieldValue), elementField, fieldValue, mapKey);
});
}

View File

@ -59,7 +59,7 @@ public class WildcardIndexPath extends RecordPathSegment {
return map.entrySet().stream()
.map(entry -> {
final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType();
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + entry.getKey() + "']", valueType);
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType);
return new MapEntryFieldValue(entry.getValue(), elementField, fieldValue, entry.getKey());
});
} else {

View File

@ -17,6 +17,7 @@
package org.apache.nifi.record.path;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -28,7 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.record.path.exception.RecordPathException;
@ -134,6 +134,10 @@ public class TestRecordPath {
assertEquals("balance", fieldValues.get(1).getField().getFieldName());
assertEquals(123.45D, fieldValues.get(1).getValue());
RecordPath.compile("/mainAccount/*[. > 100]").evaluate(record).getSelectedFields().forEach(field -> field.updateValue(122.44D));
assertEquals(1, accountValues.get("id"));
assertEquals(122.44D, accountValues.get("balance"));
}
@Test
@ -154,9 +158,21 @@ public class TestRecordPath {
assertEquals(1, fieldValues.size());
final FieldValue fieldValue = fieldValues.get(0);
assertTrue(fieldValue.getField().getFieldName().equals("accounts"));
assertEquals("accounts", fieldValue.getField().getFieldName());
assertEquals(record, fieldValue.getParentRecord().get());
assertEquals(accountRecord, fieldValue.getValue());
final Map<String, Object> updatedAccountValues = new HashMap<>(accountValues);
updatedAccountValues.put("balance", 122.44D);
final Record updatedAccountRecord = new MapRecord(getAccountSchema(), updatedAccountValues);
RecordPath.compile("/*[0]").evaluate(record).getSelectedFields().forEach(field -> field.updateValue(updatedAccountRecord));
final Object[] accountRecords = (Object[]) record.getValue("accounts");
assertEquals(1, accountRecords.length);
final Record recordToVerify = (Record) accountRecords[0];
assertEquals(122.44D, recordToVerify.getValue("balance"));
assertEquals(48, record.getValue("id"));
assertEquals("John Doe", record.getValue("name"));
}
@Test
@ -228,11 +244,30 @@ public class TestRecordPath {
final Record record = new MapRecord(schema, values);
final FieldValue fieldValue = RecordPath.compile("/attributes['city']").evaluate(record).getSelectedFields().findFirst().get();
assertTrue(fieldValue.getField().getFieldName().startsWith("attributes['"));
assertTrue(fieldValue.getField().getFieldName().equals("attributes"));
assertEquals("New York", fieldValue.getValue());
assertEquals(record, fieldValue.getParentRecord().get());
}
@Test
@SuppressWarnings("unchecked")
public void testUpdateMap() {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
final Map<String, String> attributes = new HashMap<>();
attributes.put("city", "New York");
attributes.put("state", "NY");
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
values.put("attributes", attributes);
final Record record = new MapRecord(schema, values);
RecordPath.compile("/attributes['city']").evaluate(record).getSelectedFields().findFirst().get().updateValue("Boston");
assertEquals("Boston", ((Map<String, Object>) record.getValue("attributes")).get("city"));
}
@Test
public void testMapWildcard() {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
@ -254,9 +289,18 @@ public class TestRecordPath {
assertEquals("NY", fieldValues.get(1).getValue());
for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("attributes['"));
assertEquals("attributes", fieldValue.getField().getFieldName());
assertEquals(record, fieldValue.getParentRecord().get());
}
RecordPath.compile("/attributes[*]").evaluate(record).getSelectedFields().forEach(field -> field.updateValue("Unknown"));
assertEquals("Unknown", attributes.get("city"));
assertEquals("Unknown", attributes.get("state"));
RecordPath.compile("/attributes[*][fieldName(.) = 'attributes']").evaluate(record).getSelectedFields().forEach(field -> field.updateValue("Unknown"));
assertEquals("Unknown", attributes.get("city"));
assertEquals("Unknown", attributes.get("state"));
}
@Test
@ -280,9 +324,13 @@ public class TestRecordPath {
assertEquals("NY", fieldValues.get(1).getValue());
for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("attributes['"));
assertEquals("attributes", fieldValue.getField().getFieldName());
assertEquals(record, fieldValue.getParentRecord().get());
}
RecordPath.compile("/attributes['city', 'state']").evaluate(record).getSelectedFields().forEach(field -> field.updateValue("Unknown"));
assertEquals("Unknown", attributes.get("city"));
assertEquals("Unknown", attributes.get("state"));
}
@Test
@ -314,7 +362,7 @@ public class TestRecordPath {
final Record record = new MapRecord(schema, values);
final FieldValue fieldValue = RecordPath.compile("/numbers[3]").evaluate(record).getSelectedFields().findFirst().get();
assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals("numbers", fieldValue.getField().getFieldName());
assertEquals(3, fieldValue.getValue());
assertEquals(record, fieldValue.getParentRecord().get());
}
@ -330,7 +378,7 @@ public class TestRecordPath {
final List<FieldValue> fieldValues = RecordPath.compile("/numbers[0..1]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals("numbers", fieldValue.getField().getFieldName());
assertEquals(record, fieldValue.getParentRecord().get());
}
@ -354,11 +402,13 @@ public class TestRecordPath {
int i = 0;
final int[] expectedValues = new int[] {3, 6, 9, 8};
for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers"));
assertEquals("numbers", fieldValue.getField().getFieldName());
assertEquals(expectedValues[i++], fieldValue.getValue());
assertEquals(record, fieldValue.getParentRecord().get());
}
RecordPath.compile("/numbers[3,6, -1, -2]").evaluate(record).getSelectedFields().forEach(field -> field.updateValue(99));
assertArrayEquals(new Object[] {0, 1, 2, 99, 4, 5, 99, 7, 99, 99}, (Object[]) values.get("numbers"));
}
@Test
@ -372,7 +422,7 @@ public class TestRecordPath {
List<FieldValue> fieldValues = RecordPath.compile("/numbers[0, 2, 4..7, 9]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers"));
assertEquals("numbers", fieldValue.getField().getFieldName());
assertEquals(record, fieldValue.getParentRecord().get());
}
@ -384,7 +434,7 @@ public class TestRecordPath {
fieldValues = RecordPath.compile("/numbers[0..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals("numbers", fieldValue.getField().getFieldName());
assertEquals(record, fieldValue.getParentRecord().get());
}
expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
@ -396,7 +446,7 @@ public class TestRecordPath {
fieldValues = RecordPath.compile("/numbers[-1..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals("numbers", fieldValue.getField().getFieldName());
assertEquals(record, fieldValue.getParentRecord().get());
}
expectedValues = new int[] {9};
@ -407,7 +457,7 @@ public class TestRecordPath {
fieldValues = RecordPath.compile("/numbers[*]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals("numbers", fieldValue.getField().getFieldName());
assertEquals(record, fieldValue.getParentRecord().get());
}
expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
@ -441,7 +491,7 @@ public class TestRecordPath {
for (final FieldValue fieldValue : fieldValues) {
final String fieldName = fieldValue.getField().getFieldName();
assertTrue(Pattern.compile("numbers").matcher(fieldName).matches());
assertEquals("numbers", fieldName);
assertEquals(RecordFieldType.INT, fieldValue.getField().getDataType().getFieldType());
assertEquals(4, fieldValue.getValue());
assertEquals(record, fieldValue.getParentRecord().get());
@ -478,6 +528,9 @@ public class TestRecordPath {
assertEquals(accountRecord, fieldValue.getParentRecord().get());
assertEquals(123.45D, fieldValue.getValue());
assertEquals("balance", fieldValue.getField().getFieldName());
RecordPath.compile("/mainAccount/././balance/.").evaluate(record).getSelectedFields().forEach(field -> field.updateValue(123.44D));
assertEquals(123.44D, accountValues.get("balance"));
}
@Test

View File

@ -832,13 +832,7 @@ public class AvroTypeUtil {
map.put(key, obj);
}
final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType());
final List<RecordField> mapFields = new ArrayList<>();
for (final String key : map.keySet()) {
mapFields.add(new RecordField(key, elementType, true));
}
final RecordSchema mapSchema = new SimpleRecordSchema(mapFields);
return new MapRecord(mapSchema, map);
return map;
}
return value;