NIFI-4383 - Fix UpdateRecord when updating arrays elements. This closes #2208.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Pierre Villard 2017-10-12 23:51:09 +02:00 committed by Mark Payne
parent 98af3dc4cd
commit 99d767aa44
14 changed files with 561 additions and 16 deletions

View File

@ -49,7 +49,7 @@ public class ArrayIndexPath extends RecordPathSegment {
final RecordField arrayField = new RecordField(fieldValue.getField().getFieldName(), elementDataType); final RecordField arrayField = new RecordField(fieldValue.getField().getFieldName(), elementDataType);
final Object[] values = (Object[]) fieldValue.getValue(); final Object[] values = (Object[]) fieldValue.getValue();
final int arrayIndex = getArrayIndex(values.length); final int arrayIndex = getArrayIndex(values.length);
final RecordField elementField = new RecordField(arrayField.getFieldName() + "[" + arrayIndex + "]", elementDataType); final RecordField elementField = new RecordField(arrayField.getFieldName(), elementDataType);
final FieldValue result = new ArrayIndexFieldValue(values[arrayIndex], elementField, fieldValue, arrayIndex); final FieldValue result = new ArrayIndexFieldValue(values[arrayIndex], elementField, fieldValue, arrayIndex);
return result; return result;
}); });

View File

@ -57,6 +57,7 @@ public class MultiArrayIndexPath extends RecordPathSegment {
.filter(range -> values.length > Math.abs(range.getMin())) .filter(range -> values.length > Math.abs(range.getMin()))
.flatMap(range -> { .flatMap(range -> {
final List<Object> valuesWithinRange = new ArrayList<>(); final List<Object> valuesWithinRange = new ArrayList<>();
final List<Integer> indexes = new ArrayList<Integer>();
final int min = range.getMin() < 0 ? values.length + range.getMin() : range.getMin(); final int min = range.getMin() < 0 ? values.length + range.getMin() : range.getMin();
final int max = range.getMax() < 0 ? values.length + range.getMax() : range.getMax(); final int max = range.getMax() < 0 ? values.length + range.getMax() : range.getMax();
@ -64,13 +65,14 @@ public class MultiArrayIndexPath extends RecordPathSegment {
for (int i = min; i <= max; i++) { for (int i = min; i <= max; i++) {
if (values.length > i) { if (values.length > i) {
valuesWithinRange.add(values[i]); valuesWithinRange.add(values[i]);
indexes.add(i);
} }
} }
return IntStream.range(0, valuesWithinRange.size()) return IntStream.range(0, valuesWithinRange.size())
.mapToObj(index -> { .mapToObj(index -> {
final RecordField elementField = new RecordField(arrayField.getFieldName() + "[" + index + "]", elementDataType); final RecordField elementField = new RecordField(arrayField.getFieldName(), elementDataType);
return new ArrayIndexFieldValue(valuesWithinRange.get(index), elementField, fieldValue, index); return new ArrayIndexFieldValue(valuesWithinRange.get(index), elementField, fieldValue, indexes.get(index));
}); });
}); });

View File

@ -67,7 +67,7 @@ public class WildcardIndexPath extends RecordPathSegment {
return IntStream.range(0, array.length) return IntStream.range(0, array.length)
.mapToObj(index -> { .mapToObj(index -> {
final DataType elementDataType = ((ArrayDataType) fieldValue.getField().getDataType()).getElementType(); final DataType elementDataType = ((ArrayDataType) fieldValue.getField().getDataType()).getElementType();
final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "[" + index + "]", elementDataType); final RecordField elementField = new RecordField(fieldValue.getField().getFieldName(), elementDataType);
return new ArrayIndexFieldValue(array[index], elementField, fieldValue, index); return new ArrayIndexFieldValue(array[index], elementField, fieldValue, index);
}); });
} }

View File

@ -154,7 +154,7 @@ public class TestRecordPath {
assertEquals(1, fieldValues.size()); assertEquals(1, fieldValues.size());
final FieldValue fieldValue = fieldValues.get(0); final FieldValue fieldValue = fieldValues.get(0);
assertTrue(fieldValue.getField().getFieldName().startsWith("accounts[")); assertTrue(fieldValue.getField().getFieldName().equals("accounts"));
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
assertEquals(accountRecord, fieldValue.getValue()); assertEquals(accountRecord, fieldValue.getValue());
} }
@ -314,7 +314,7 @@ public class TestRecordPath {
final Record record = new MapRecord(schema, values); final Record record = new MapRecord(schema, values);
final FieldValue fieldValue = RecordPath.compile("/numbers[3]").evaluate(record).getSelectedFields().findFirst().get(); final FieldValue fieldValue = RecordPath.compile("/numbers[3]").evaluate(record).getSelectedFields().findFirst().get();
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers[")); assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals(3, fieldValue.getValue()); assertEquals(3, fieldValue.getValue());
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
} }
@ -330,7 +330,7 @@ public class TestRecordPath {
final List<FieldValue> fieldValues = RecordPath.compile("/numbers[0..1]").evaluate(record).getSelectedFields().collect(Collectors.toList()); final List<FieldValue> fieldValues = RecordPath.compile("/numbers[0..1]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) { for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers[")); assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
} }
@ -354,7 +354,7 @@ public class TestRecordPath {
int i = 0; int i = 0;
final int[] expectedValues = new int[] {3, 6, 9, 8}; final int[] expectedValues = new int[] {3, 6, 9, 8};
for (final FieldValue fieldValue : fieldValues) { for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers[")); assertTrue(fieldValue.getField().getFieldName().startsWith("numbers"));
assertEquals(expectedValues[i++], fieldValue.getValue()); assertEquals(expectedValues[i++], fieldValue.getValue());
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
} }
@ -372,7 +372,7 @@ public class TestRecordPath {
List<FieldValue> fieldValues = RecordPath.compile("/numbers[0, 2, 4..7, 9]").evaluate(record).getSelectedFields().collect(Collectors.toList()); List<FieldValue> fieldValues = RecordPath.compile("/numbers[0, 2, 4..7, 9]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) { for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers[")); assertTrue(fieldValue.getField().getFieldName().startsWith("numbers"));
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
} }
@ -384,7 +384,7 @@ public class TestRecordPath {
fieldValues = RecordPath.compile("/numbers[0..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList()); fieldValues = RecordPath.compile("/numbers[0..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) { for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers[")); assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
} }
expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
@ -396,7 +396,7 @@ public class TestRecordPath {
fieldValues = RecordPath.compile("/numbers[-1..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList()); fieldValues = RecordPath.compile("/numbers[-1..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) { for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers[")); assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
} }
expectedValues = new int[] {9}; expectedValues = new int[] {9};
@ -407,7 +407,7 @@ public class TestRecordPath {
fieldValues = RecordPath.compile("/numbers[*]").evaluate(record).getSelectedFields().collect(Collectors.toList()); fieldValues = RecordPath.compile("/numbers[*]").evaluate(record).getSelectedFields().collect(Collectors.toList());
for (final FieldValue fieldValue : fieldValues) { for (final FieldValue fieldValue : fieldValues) {
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers[")); assertTrue(fieldValue.getField().getFieldName().equals("numbers"));
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
} }
expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
@ -441,7 +441,7 @@ public class TestRecordPath {
for (final FieldValue fieldValue : fieldValues) { for (final FieldValue fieldValue : fieldValues) {
final String fieldName = fieldValue.getField().getFieldName(); final String fieldName = fieldValue.getField().getFieldName();
assertTrue(Pattern.compile("numbers\\[\\d\\]").matcher(fieldName).matches()); assertTrue(Pattern.compile("numbers").matcher(fieldName).matches());
assertEquals(RecordFieldType.INT, fieldValue.getField().getDataType().getFieldType()); assertEquals(RecordFieldType.INT, fieldValue.getField().getDataType().getFieldType());
assertEquals(4, fieldValue.getValue()); assertEquals(4, fieldValue.getValue());
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
@ -536,7 +536,8 @@ public class TestRecordPath {
assertEquals(1, fieldValues.size()); assertEquals(1, fieldValues.size());
final FieldValue fieldValue = fieldValues.get(0); final FieldValue fieldValue = fieldValues.get(0);
assertEquals("accounts[0]", fieldValue.getField().getFieldName()); assertEquals("accounts", fieldValue.getField().getFieldName());
assertEquals(0, ((ArrayIndexFieldValue) fieldValue).getArrayIndex());
assertEquals(record, fieldValue.getParentRecord().get()); assertEquals(record, fieldValue.getParentRecord().get());
assertEquals(accountRecord1, fieldValue.getValue()); assertEquals(accountRecord1, fieldValue.getValue());
} }

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.ConfigurableComponent;
@ -43,7 +44,6 @@ import org.apache.nifi.processor.SchedulingContext;
import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.junit.Assert; import org.junit.Assert;
import static java.util.Objects.requireNonNull;
public class MockProcessContext extends MockControllerServiceLookup implements SchedulingContext, ControllerServiceLookup, NodeTypeProvider { public class MockProcessContext extends MockControllerServiceLookup implements SchedulingContext, ControllerServiceLookup, NodeTypeProvider {
@ -154,7 +154,12 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
public boolean removeProperty(final PropertyDescriptor descriptor) { public boolean removeProperty(final PropertyDescriptor descriptor) {
Objects.requireNonNull(descriptor); Objects.requireNonNull(descriptor);
final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName()); return removeProperty(descriptor.getName());
}
public boolean removeProperty(final String property) {
Objects.requireNonNull(property);
final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(property);
String value = null; String value = null;
if ((value = properties.remove(fullyPopulatedDescriptor)) != null) { if ((value = properties.remove(fullyPopulatedDescriptor)) != null) {

View File

@ -803,6 +803,11 @@ public class StandardProcessorTestRunner implements TestRunner {
return context.removeProperty(descriptor); return context.removeProperty(descriptor);
} }
@Override
public boolean removeProperty(String property) {
return context.removeProperty(property);
}
@Override @Override
public List<ProvenanceEventRecord> getProvenanceEvents() { public List<ProvenanceEventRecord> getProvenanceEvents() {
return sharedState.getProvenanceEvents(); return sharedState.getProvenanceEvents();

View File

@ -855,6 +855,15 @@ public interface TestRunner {
*/ */
boolean removeProperty(PropertyDescriptor descriptor); boolean removeProperty(PropertyDescriptor descriptor);
/**
* Removes the property from the {@link ProcessContext},
* effectively setting its value to null, or the property's default value, if it has one.
*
* @param property name of the property to remove
* @return <code>true</code> if removed, <code>false</code> if the property was not set
*/
boolean removeProperty(String property);
/** /**
* Returns a {@link List} of all {@link ProvenanceEventRecord}s that were * Returns a {@link List} of all {@link ProvenanceEventRecord}s that were
* emitted by the Processor * emitted by the Processor

View File

@ -488,6 +488,7 @@
<exclude>src/test/resources/TestUpdateRecord/input/person.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/person.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/input/person-address.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/person-address.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/input/person-with-null-array.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/person-with-null-array.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/input/multi-arrays.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/person-with-null-array.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-null-array.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude>
@ -501,6 +502,10 @@
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/multi-arrays.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and1.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and2.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-streets.json</exclude>
<!-- This file is copied from https://github.com/jeremyh/jBCrypt <!-- This file is copied from https://github.com/jeremyh/jBCrypt
because the binary is compiled for Java 8 and we must support Java 7 --> because the binary is compiled for Java 8 and we must support Java 7 -->
<exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude> <exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude>

View File

@ -17,11 +17,15 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collections; import java.util.Collections;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
@ -459,4 +463,203 @@ public class TestUpdateRecord {
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
} }
@Test
public void testUpdateSimpleArray() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/multi-arrays.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/multi-arrays.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.LITERAL_VALUES);
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/numbers[*]", "8");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json")));
expectedOutput = expectedOutput.replaceFirst("1, null, 4", "8, 8, 8");
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
runner.removeProperty("/numbers[*]");
runner.clearTransferState();
runner.enqueue("{\"numbers\":null}");
runner.setProperty("/numbers[*]", "8");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
String content = new String(runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).toByteArray());
assertTrue(content.contains("\"numbers\" : null"));
runner.removeProperty("/numbers[*]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/numbers[1]", "8");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json")));
expectedOutput = expectedOutput.replaceFirst("1, null, 4", "1, 8, 4");
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
runner.removeProperty("/numbers[1]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/numbers[0..1]", "8");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json")));
expectedOutput = expectedOutput.replaceFirst("1, null, 4", "8, 8, 4");
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
runner.removeProperty("/numbers[0..1]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/numbers[0,2]", "8");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json")));
expectedOutput = expectedOutput.replaceFirst("1, null, 4", "8, null, 8");
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
runner.removeProperty("/numbers[0,2]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/numbers[0,1..2]", "8");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json")));
expectedOutput = expectedOutput.replaceFirst("1, null, 4", "8, 8, 8");
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
runner.removeProperty("/numbers[0,1..2]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/numbers[0..-1][. = 4]", "8");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json")));
expectedOutput = expectedOutput.replaceFirst("1, null, 4", "1, null, 8");
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
runner.removeProperty("/numbers[0..-1][. = 4]");
}
@Test
public void testUpdateComplexArrays() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/multi-arrays.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/multi-arrays.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/peoples[*]", "/peoples[3]");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
String content = new String(runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).toByteArray());
int count = StringUtils.countMatches(content, "Mary Doe");
assertEquals(4, count);
runner.removeProperty("/peoples[*]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/peoples[1]", "/peoples[3]");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
content = new String(runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).toByteArray());
count = StringUtils.countMatches(content, "Mary Doe");
assertEquals(2, count);
runner.removeProperty("/peoples[1]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/peoples[0..1]", "/peoples[3]");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and1.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
runner.removeProperty("/peoples[0..1]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/peoples[0,2]", "/peoples[3]");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and2.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
runner.removeProperty("/peoples[0,2]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/peoples[0,1..2]", "/peoples[3]");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
content = new String(runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).toByteArray());
count = StringUtils.countMatches(content, "Mary Doe");
assertEquals(4, count);
runner.removeProperty("/peoples[0,1..2]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/peoples[0..-1][./name != 'Mary Doe']", "/peoples[3]");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
content = new String(runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).toByteArray());
count = StringUtils.countMatches(content, "Mary Doe");
assertEquals(4, count);
runner.removeProperty("/peoples[0..-1][./name != 'Mary Doe']");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/peoples[0..-1][./name != 'Mary Doe']/addresses[*]", "/peoples[3]/addresses[0]");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
content = new String(runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).toByteArray());
count = StringUtils.countMatches(content, "1 nifi road");
assertEquals(13, count);
runner.removeProperty("/peoples[0..-1][./name != 'Mary Doe']/addresses[*]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/peoples[0..-1][./name != 'Mary Doe']/addresses[0,1..2]", "/peoples[3]/addresses[0]");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-streets.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
runner.removeProperty("/peoples[0..-1][./name != 'Mary Doe']/addresses[0,1..2]");
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/multi-arrays.json"));
runner.setProperty("/peoples[0..-1][./name != 'Mary Doe']/addresses[0,1..2]/city", "newCity");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.LITERAL_VALUES);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
content = new String(runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).toByteArray());
count = StringUtils.countMatches(content, "newCity");
assertEquals(9, count);
runner.removeProperty("/peoples[0..-1][./name != 'Mary Doe']/addresses[0,1..2]/city");
}
} }

View File

@ -0,0 +1,64 @@
[ {
"numbers" : [ 1, null, 4 ],
"peoples" : [ {
"name" : "John Doe",
"addresses" : [ {
"street" : "1 nifi street",
"city" : "nificity"
}, {
"street" : "2 nifi street",
"city" : "nificity"
}, {
"street" : "3 nifi street",
"city" : "nificity"
}, {
"street" : "4 nifi street",
"city" : "nificity"
} ]
}, {
"name" : "Jane Doe",
"addresses" : [ {
"street" : "1 nifi avenue",
"city" : "nificity"
}, {
"street" : "2 nifi avenue",
"city" : "nificity"
}, {
"street" : "3 nifi avenue",
"city" : "nificity"
}, {
"street" : "4 nifi avenue",
"city" : "nificity"
} ]
}, {
"name" : "Tom Doe",
"addresses" : [ {
"street" : "1 nifi boulevard",
"city" : "nificity"
}, {
"street" : "2 nifi boulevard",
"city" : "nificity"
}, {
"street" : "3 nifi boulevard",
"city" : "nificity"
}, {
"street" : "4 nifi boulevard",
"city" : "nificity"
} ]
}, {
"name" : "Mary Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "2 nifi road",
"city" : "nificity"
}, {
"street" : "3 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi road",
"city" : "nificity"
} ]
} ]
} ]

View File

@ -0,0 +1,64 @@
[ {
"numbers" : [ 1, null, 4 ],
"peoples" : [ {
"name" : "Mary Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "2 nifi road",
"city" : "nificity"
}, {
"street" : "3 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi road",
"city" : "nificity"
} ]
}, {
"name" : "Mary Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "2 nifi road",
"city" : "nificity"
}, {
"street" : "3 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi road",
"city" : "nificity"
} ]
}, {
"name" : "Tom Doe",
"addresses" : [ {
"street" : "1 nifi boulevard",
"city" : "nificity"
}, {
"street" : "2 nifi boulevard",
"city" : "nificity"
}, {
"street" : "3 nifi boulevard",
"city" : "nificity"
}, {
"street" : "4 nifi boulevard",
"city" : "nificity"
} ]
}, {
"name" : "Mary Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "2 nifi road",
"city" : "nificity"
}, {
"street" : "3 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi road",
"city" : "nificity"
} ]
} ]
} ]

View File

@ -0,0 +1,64 @@
[ {
"numbers" : [ 1, null, 4 ],
"peoples" : [ {
"name" : "Mary Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "2 nifi road",
"city" : "nificity"
}, {
"street" : "3 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi road",
"city" : "nificity"
} ]
}, {
"name" : "Jane Doe",
"addresses" : [ {
"street" : "1 nifi avenue",
"city" : "nificity"
}, {
"street" : "2 nifi avenue",
"city" : "nificity"
}, {
"street" : "3 nifi avenue",
"city" : "nificity"
}, {
"street" : "4 nifi avenue",
"city" : "nificity"
} ]
}, {
"name" : "Mary Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "2 nifi road",
"city" : "nificity"
}, {
"street" : "3 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi road",
"city" : "nificity"
} ]
}, {
"name" : "Mary Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "2 nifi road",
"city" : "nificity"
}, {
"street" : "3 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi road",
"city" : "nificity"
} ]
} ]
} ]

View File

@ -0,0 +1,64 @@
[ {
"numbers" : [ 1, null, 4 ],
"peoples" : [ {
"name" : "John Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi street",
"city" : "nificity"
} ]
}, {
"name" : "Jane Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi avenue",
"city" : "nificity"
} ]
}, {
"name" : "Tom Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi boulevard",
"city" : "nificity"
} ]
}, {
"name" : "Mary Doe",
"addresses" : [ {
"street" : "1 nifi road",
"city" : "nificity"
}, {
"street" : "2 nifi road",
"city" : "nificity"
}, {
"street" : "3 nifi road",
"city" : "nificity"
}, {
"street" : "4 nifi road",
"city" : "nificity"
} ]
} ]
} ]

View File

@ -0,0 +1,59 @@
{
"name": "simpleArray",
"namespace": "nifi",
"type": "record",
"fields": [
{
"name": "numbers",
"type": [
"null",
{
"type": "array",
"items": "int"
}
]
},
{
"name": "peoples",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "Person",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "addresses",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "Address",
"fields": [
{
"name": "street",
"type": "string"
},
{
"name": "city",
"type": "string"
}
]
}
}
]
}
]
}
}
]
}
]
}