From a36afe0bbe0051b528810d0670757d3401c80215 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 11 Jan 2018 15:04:14 -0500 Subject: [PATCH] NIFI-4767 - Fixed issues with RecordPath using the wrong field name for arrays and maps. Also addressed issue where Avro Reader was returning a Record object when it should return a Map Signed-off-by: Pierre Villard This closes #2398. --- .../path/paths/MultiArrayIndexPath.java | 16 ++-- .../record/path/paths/MultiMapKeyPath.java | 2 +- .../record/path/paths/SingularMapKeyPath.java | 2 +- .../record/path/paths/WildcardIndexPath.java | 2 +- .../nifi/record/path/TestRecordPath.java | 79 ++++++++++++++++--- .../org/apache/nifi/avro/AvroTypeUtil.java | 8 +- 6 files changed, 75 insertions(+), 34 deletions(-) diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java index 88ce61438e..ce680c6bbb 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java @@ -19,7 +19,6 @@ package org.apache.nifi.record.path.paths; import java.util.ArrayList; import java.util.List; -import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.nifi.record.path.ArrayIndexFieldValue; @@ -56,24 +55,19 @@ public class MultiArrayIndexPath extends RecordPathSegment { return indices.stream() .filter(range -> values.length > Math.abs(range.getMin())) .flatMap(range -> { - final List valuesWithinRange = new ArrayList<>(); - final List indexes = new ArrayList(); - final int min = range.getMin() < 0 ? values.length + range.getMin() : range.getMin(); final int max = range.getMax() < 0 ? values.length + range.getMax() : range.getMax(); + final List indexFieldValues = new ArrayList<>(Math.max(0, max - min)); for (int i = min; i <= max; i++) { if (values.length > i) { - valuesWithinRange.add(values[i]); - indexes.add(i); + final RecordField elementField = new RecordField(arrayField.getFieldName(), elementDataType); + final FieldValue arrayIndexFieldValue = new ArrayIndexFieldValue(values[i], elementField, fieldValue, i); + indexFieldValues.add(arrayIndexFieldValue); } } - return IntStream.range(0, valuesWithinRange.size()) - .mapToObj(index -> { - final RecordField elementField = new RecordField(arrayField.getFieldName(), elementDataType); - return new ArrayIndexFieldValue(valuesWithinRange.get(index), elementField, fieldValue, indexes.get(index)); - }); + return indexFieldValues.stream(); }); }); diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java index e9b1874b2b..dfb707129d 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java @@ -49,7 +49,7 @@ public class MultiMapKeyPath extends RecordPathSegment { final Map map = (Map) fieldValue.getValue(); return mapKeys.stream().map(key -> { final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType(); - final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + key + "']", valueType); + final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType); return new MapEntryFieldValue(map.get(key), elementField, fieldValue, key); }); }); diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java index ee57f3621d..201ddc15a7 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java @@ -45,7 +45,7 @@ public class SingularMapKeyPath extends RecordPathSegment { .filter(Filters.fieldTypeFilter(RecordFieldType.MAP)) .map(fieldValue -> { final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType(); - final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + mapKey + "']", valueType); + final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType); return new MapEntryFieldValue(getMapValue(fieldValue), elementField, fieldValue, mapKey); }); } diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java index b9241e8e9a..ab4b176c09 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java @@ -59,7 +59,7 @@ public class WildcardIndexPath extends RecordPathSegment { return map.entrySet().stream() .map(entry -> { final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType(); - final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + entry.getKey() + "']", valueType); + final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), valueType); return new MapEntryFieldValue(entry.getValue(), elementField, fieldValue, entry.getKey()); }); } else { diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java index 0c96111547..67e390a2e0 100644 --- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java +++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java @@ -17,6 +17,7 @@ package org.apache.nifi.record.path; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -28,7 +29,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.nifi.record.path.exception.RecordPathException; @@ -134,6 +134,10 @@ public class TestRecordPath { assertEquals("balance", fieldValues.get(1).getField().getFieldName()); assertEquals(123.45D, fieldValues.get(1).getValue()); + + RecordPath.compile("/mainAccount/*[. > 100]").evaluate(record).getSelectedFields().forEach(field -> field.updateValue(122.44D)); + assertEquals(1, accountValues.get("id")); + assertEquals(122.44D, accountValues.get("balance")); } @Test @@ -154,9 +158,21 @@ public class TestRecordPath { assertEquals(1, fieldValues.size()); final FieldValue fieldValue = fieldValues.get(0); - assertTrue(fieldValue.getField().getFieldName().equals("accounts")); + assertEquals("accounts", fieldValue.getField().getFieldName()); assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(accountRecord, fieldValue.getValue()); + + final Map updatedAccountValues = new HashMap<>(accountValues); + updatedAccountValues.put("balance", 122.44D); + final Record updatedAccountRecord = new MapRecord(getAccountSchema(), updatedAccountValues); + RecordPath.compile("/*[0]").evaluate(record).getSelectedFields().forEach(field -> field.updateValue(updatedAccountRecord)); + + final Object[] accountRecords = (Object[]) record.getValue("accounts"); + assertEquals(1, accountRecords.length); + final Record recordToVerify = (Record) accountRecords[0]; + assertEquals(122.44D, recordToVerify.getValue("balance")); + assertEquals(48, record.getValue("id")); + assertEquals("John Doe", record.getValue("name")); } @Test @@ -228,11 +244,30 @@ public class TestRecordPath { final Record record = new MapRecord(schema, values); final FieldValue fieldValue = RecordPath.compile("/attributes['city']").evaluate(record).getSelectedFields().findFirst().get(); - assertTrue(fieldValue.getField().getFieldName().startsWith("attributes['")); + assertTrue(fieldValue.getField().getFieldName().equals("attributes")); assertEquals("New York", fieldValue.getValue()); assertEquals(record, fieldValue.getParentRecord().get()); } + @Test + @SuppressWarnings("unchecked") + public void testUpdateMap() { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); + + final Map attributes = new HashMap<>(); + attributes.put("city", "New York"); + attributes.put("state", "NY"); + + final Map values = new HashMap<>(); + values.put("id", 48); + values.put("name", "John Doe"); + values.put("attributes", attributes); + final Record record = new MapRecord(schema, values); + + RecordPath.compile("/attributes['city']").evaluate(record).getSelectedFields().findFirst().get().updateValue("Boston"); + assertEquals("Boston", ((Map) record.getValue("attributes")).get("city")); + } + @Test public void testMapWildcard() { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); @@ -254,9 +289,18 @@ public class TestRecordPath { assertEquals("NY", fieldValues.get(1).getValue()); for (final FieldValue fieldValue : fieldValues) { - assertTrue(fieldValue.getField().getFieldName().startsWith("attributes['")); + assertEquals("attributes", fieldValue.getField().getFieldName()); assertEquals(record, fieldValue.getParentRecord().get()); } + + RecordPath.compile("/attributes[*]").evaluate(record).getSelectedFields().forEach(field -> field.updateValue("Unknown")); + assertEquals("Unknown", attributes.get("city")); + assertEquals("Unknown", attributes.get("state")); + + RecordPath.compile("/attributes[*][fieldName(.) = 'attributes']").evaluate(record).getSelectedFields().forEach(field -> field.updateValue("Unknown")); + assertEquals("Unknown", attributes.get("city")); + assertEquals("Unknown", attributes.get("state")); + } @Test @@ -280,9 +324,13 @@ public class TestRecordPath { assertEquals("NY", fieldValues.get(1).getValue()); for (final FieldValue fieldValue : fieldValues) { - assertTrue(fieldValue.getField().getFieldName().startsWith("attributes['")); + assertEquals("attributes", fieldValue.getField().getFieldName()); assertEquals(record, fieldValue.getParentRecord().get()); } + + RecordPath.compile("/attributes['city', 'state']").evaluate(record).getSelectedFields().forEach(field -> field.updateValue("Unknown")); + assertEquals("Unknown", attributes.get("city")); + assertEquals("Unknown", attributes.get("state")); } @Test @@ -314,7 +362,7 @@ public class TestRecordPath { final Record record = new MapRecord(schema, values); final FieldValue fieldValue = RecordPath.compile("/numbers[3]").evaluate(record).getSelectedFields().findFirst().get(); - assertTrue(fieldValue.getField().getFieldName().equals("numbers")); + assertEquals("numbers", fieldValue.getField().getFieldName()); assertEquals(3, fieldValue.getValue()); assertEquals(record, fieldValue.getParentRecord().get()); } @@ -330,7 +378,7 @@ public class TestRecordPath { final List fieldValues = RecordPath.compile("/numbers[0..1]").evaluate(record).getSelectedFields().collect(Collectors.toList()); for (final FieldValue fieldValue : fieldValues) { - assertTrue(fieldValue.getField().getFieldName().equals("numbers")); + assertEquals("numbers", fieldValue.getField().getFieldName()); assertEquals(record, fieldValue.getParentRecord().get()); } @@ -354,11 +402,13 @@ public class TestRecordPath { int i = 0; final int[] expectedValues = new int[] {3, 6, 9, 8}; for (final FieldValue fieldValue : fieldValues) { - assertTrue(fieldValue.getField().getFieldName().startsWith("numbers")); + assertEquals("numbers", fieldValue.getField().getFieldName()); assertEquals(expectedValues[i++], fieldValue.getValue()); assertEquals(record, fieldValue.getParentRecord().get()); } + RecordPath.compile("/numbers[3,6, -1, -2]").evaluate(record).getSelectedFields().forEach(field -> field.updateValue(99)); + assertArrayEquals(new Object[] {0, 1, 2, 99, 4, 5, 99, 7, 99, 99}, (Object[]) values.get("numbers")); } @Test @@ -372,7 +422,7 @@ public class TestRecordPath { List fieldValues = RecordPath.compile("/numbers[0, 2, 4..7, 9]").evaluate(record).getSelectedFields().collect(Collectors.toList()); for (final FieldValue fieldValue : fieldValues) { - assertTrue(fieldValue.getField().getFieldName().startsWith("numbers")); + assertEquals("numbers", fieldValue.getField().getFieldName()); assertEquals(record, fieldValue.getParentRecord().get()); } @@ -384,7 +434,7 @@ public class TestRecordPath { fieldValues = RecordPath.compile("/numbers[0..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList()); for (final FieldValue fieldValue : fieldValues) { - assertTrue(fieldValue.getField().getFieldName().equals("numbers")); + assertEquals("numbers", fieldValue.getField().getFieldName()); assertEquals(record, fieldValue.getParentRecord().get()); } expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; @@ -396,7 +446,7 @@ public class TestRecordPath { fieldValues = RecordPath.compile("/numbers[-1..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList()); for (final FieldValue fieldValue : fieldValues) { - assertTrue(fieldValue.getField().getFieldName().equals("numbers")); + assertEquals("numbers", fieldValue.getField().getFieldName()); assertEquals(record, fieldValue.getParentRecord().get()); } expectedValues = new int[] {9}; @@ -407,7 +457,7 @@ public class TestRecordPath { fieldValues = RecordPath.compile("/numbers[*]").evaluate(record).getSelectedFields().collect(Collectors.toList()); for (final FieldValue fieldValue : fieldValues) { - assertTrue(fieldValue.getField().getFieldName().equals("numbers")); + assertEquals("numbers", fieldValue.getField().getFieldName()); assertEquals(record, fieldValue.getParentRecord().get()); } expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; @@ -441,7 +491,7 @@ public class TestRecordPath { for (final FieldValue fieldValue : fieldValues) { final String fieldName = fieldValue.getField().getFieldName(); - assertTrue(Pattern.compile("numbers").matcher(fieldName).matches()); + assertEquals("numbers", fieldName); assertEquals(RecordFieldType.INT, fieldValue.getField().getDataType().getFieldType()); assertEquals(4, fieldValue.getValue()); assertEquals(record, fieldValue.getParentRecord().get()); @@ -478,6 +528,9 @@ public class TestRecordPath { assertEquals(accountRecord, fieldValue.getParentRecord().get()); assertEquals(123.45D, fieldValue.getValue()); assertEquals("balance", fieldValue.getField().getFieldName()); + + RecordPath.compile("/mainAccount/././balance/.").evaluate(record).getSelectedFields().forEach(field -> field.updateValue(123.44D)); + assertEquals(123.44D, accountValues.get("balance")); } @Test diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index adbb6e36a2..48f661f9b9 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -832,13 +832,7 @@ public class AvroTypeUtil { map.put(key, obj); } - final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType()); - final List mapFields = new ArrayList<>(); - for (final String key : map.keySet()) { - mapFields.add(new RecordField(key, elementType, true)); - } - final RecordSchema mapSchema = new SimpleRecordSchema(mapFields); - return new MapRecord(mapSchema, map); + return map; } return value;