From 8ffa1703ba0ff82aa2127c511adfaf2241fb1c8e Mon Sep 17 00:00:00 2001 From: Bryan Rosander Date: Wed, 25 Jan 2017 11:40:05 -0500 Subject: [PATCH] NIFI-3389 - Using long string type for attribute name and value in FlowFileSchema This closes #1446. --- .../SchemaRepositoryRecordSerde.java | 22 +- .../repository/schema/FlowFileSchema.java | 20 ++ .../schema/RepositoryRecordSchema.java | 54 +++- .../schema/RepositoryRecordUpdate.java | 2 +- .../controller/swap/SchemaSwapSerializer.java | 4 +- .../nifi/controller/swap/SwapSchema.java | 34 ++- .../SchemaRepositoryRecordSerdeTest.java | 266 ++++++++++++++++++ 7 files changed, 383 insertions(+), 19 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java index c0c9d180f1..75f6ff20c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java @@ -45,9 +45,9 @@ import org.wali.UpdateType; public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe { private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class); - private static final int MAX_ENCODING_VERSION = 1; + private static final int MAX_ENCODING_VERSION = 2; - private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1; + private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2; private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1; private final ResourceClaimManager resourceClaimManager; @@ -73,25 +73,29 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement switch (record.getType()) { case CREATE: case UPDATE: - schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1; + schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V2; break; case CONTENTMISSING: case DELETE: - schema = RepositoryRecordSchema.DELETE_SCHEMA_V1; + schema = RepositoryRecordSchema.DELETE_SCHEMA_V2; break; case SWAP_IN: - schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V1; + schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V2; break; case SWAP_OUT: - schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V1; + schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V2; break; default: throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen. } - final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema); - final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + serializeRecord(record, out, schema, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2); + } + + protected void serializeRecord(final RepositoryRecord record, final DataOutputStream out, RecordSchema schema, RecordSchema repositoryRecordSchema) throws IOException { + final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema); + final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, repositoryRecordSchema); new SchemaRecordWriter().writeRecord(update, out); } @@ -112,7 +116,7 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the // top level that indicates which type of record we have. - final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1); + final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2); final String actionType = (String) record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD); final UpdateType updateType = UpdateType.valueOf(actionType); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java index 53eab703d7..6af306686d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java @@ -45,6 +45,7 @@ public class FlowFileSchema { public static final String ATTRIBUTE_VALUE = "Attribute Value"; public static final RecordSchema FLOWFILE_SCHEMA_V1; + public static final RecordSchema FLOWFILE_SCHEMA_V2; static { final List flowFileFields = new ArrayList<>(); @@ -64,4 +65,23 @@ public class FlowFileSchema { FLOWFILE_SCHEMA_V1 = new RecordSchema(flowFileFields); } + + static { + final List flowFileFields = new ArrayList<>(); + + final RecordField attributeNameField = new SimpleRecordField(ATTRIBUTE_NAME, FieldType.LONG_STRING, Repetition.EXACTLY_ONE); + final RecordField attributeValueField = new SimpleRecordField(ATTRIBUTE_VALUE, FieldType.LONG_STRING, Repetition.EXACTLY_ONE); + + flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, Repetition.ZERO_OR_ONE, ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields())); + flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, attributeValueField, Repetition.ZERO_OR_ONE)); + + FLOWFILE_SCHEMA_V2 = new RecordSchema(flowFileFields); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java index f99b5d9957..db77c8baf4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java @@ -17,10 +17,6 @@ package org.apache.nifi.controller.repository.schema; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - import org.apache.nifi.repository.schema.ComplexRecordField; import org.apache.nifi.repository.schema.FieldType; import org.apache.nifi.repository.schema.RecordField; @@ -29,9 +25,13 @@ import org.apache.nifi.repository.schema.Repetition; import org.apache.nifi.repository.schema.SimpleRecordField; import org.apache.nifi.repository.schema.UnionRecordField; -public class RepositoryRecordSchema { +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +public class RepositoryRecordSchema { public static final String REPOSITORY_RECORD_UPDATE_V1 = "Repository Record Update"; // top level field name + public static final String REPOSITORY_RECORD_UPDATE_V2 = "Repository Record Update"; // top level field name // repository record fields public static final String ACTION_TYPE = "Action"; @@ -51,6 +51,12 @@ public class RepositoryRecordSchema { public static final RecordSchema SWAP_IN_SCHEMA_V1; public static final RecordSchema SWAP_OUT_SCHEMA_V1; + public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V2; + public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V2; + public static final RecordSchema DELETE_SCHEMA_V2; + public static final RecordSchema SWAP_IN_SCHEMA_V2; + public static final RecordSchema SWAP_OUT_SCHEMA_V2; + public static final RecordField ACTION_TYPE_FIELD = new SimpleRecordField(ACTION_TYPE, FieldType.STRING, Repetition.EXACTLY_ONE); public static final RecordField RECORD_ID_FIELD = new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE); @@ -91,4 +97,42 @@ public class RepositoryRecordSchema { final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V1, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn); REPOSITORY_RECORD_SCHEMA_V1 = new RecordSchema(Collections.singletonList(repoUpdateField)); } + + static { + // Fields for "Create" or "Update" records + final List createOrUpdateFields = new ArrayList<>(); + createOrUpdateFields.add(ACTION_TYPE_FIELD); + createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields()); + + createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); + createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE)); + final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields); + CREATE_OR_UPDATE_SCHEMA_V2 = new RecordSchema(createOrUpdateFields); + + // Fields for "Delete" records + final List deleteFields = new ArrayList<>(); + deleteFields.add(ACTION_TYPE_FIELD); + deleteFields.add(RECORD_ID_FIELD); + final ComplexRecordField delete = new ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields); + DELETE_SCHEMA_V2 = new RecordSchema(deleteFields); + + // Fields for "Swap Out" records + final List swapOutFields = new ArrayList<>(); + swapOutFields.add(ACTION_TYPE_FIELD); + swapOutFields.add(RECORD_ID_FIELD); + swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); + swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + final ComplexRecordField swapOut = new ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields); + SWAP_OUT_SCHEMA_V2 = new RecordSchema(swapOutFields); + + // Fields for "Swap In" records + final List swapInFields = new ArrayList<>(createOrUpdateFields); + swapInFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + final ComplexRecordField swapIn = new ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields); + SWAP_IN_SCHEMA_V2 = new RecordSchema(swapInFields); + + // Union Field that creates the top-level field type + final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V2, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn); + REPOSITORY_RECORD_SCHEMA_V2 = new RecordSchema(Collections.singletonList(repoUpdateField)); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java index ad51f4d830..c11353b552 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java @@ -38,7 +38,7 @@ public class RepositoryRecordUpdate implements Record { @Override public Object getFieldValue(final String fieldName) { - if (RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1.equals(fieldName)) { + if (RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2.equals(fieldName)) { final String actionType = (String) fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE); final UpdateType updateType = UpdateType.valueOf(actionType); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java index 195f55af8f..96c7ddcc15 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java @@ -46,7 +46,7 @@ import org.apache.nifi.repository.schema.SimpleRecordField; public class SchemaSwapSerializer implements SwapSerializer { static final String SERIALIZATION_NAME = "Schema Swap Serialization"; - private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V1; + private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V2; private final RecordSchema flowFileSchema = new RecordSchema(schema.getField(SwapSchema.FLOWFILE_CONTENTS).getSubFields()); @Override @@ -78,7 +78,7 @@ public class SchemaSwapSerializer implements SwapSerializer { // Create a simple record to hold the summary and the flowfile contents final RecordField summaryField = new SimpleRecordField(SwapSchema.SWAP_SUMMARY, FieldType.COMPLEX, Repetition.EXACTLY_ONE); - final RecordField contentsField = new ComplexRecordField(SwapSchema.FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields()); + final RecordField contentsField = new ComplexRecordField(SwapSchema.FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields()); final List fields = new ArrayList<>(2); fields.add(summaryField); fields.add(contentsField); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java index 70fb539272..6908900d23 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java @@ -32,11 +32,14 @@ import org.apache.nifi.repository.schema.Repetition; import org.apache.nifi.repository.schema.SimpleRecordField; public class SwapSchema { - public static final RecordSchema SWAP_SUMMARY_SCHEMA_V1; public static final RecordSchema SWAP_CONTENTS_SCHEMA_V1; public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V1; + public static final RecordSchema SWAP_SUMMARY_SCHEMA_V2; + public static final RecordSchema SWAP_CONTENTS_SCHEMA_V2; + public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V2; + public static final String RESOURCE_CLAIMS = "Resource Claims"; public static final String RESOURCE_CLAIM = "Resource Claim"; public static final String RESOURCE_CLAIM_COUNT = "Claim Count"; @@ -48,7 +51,6 @@ public class SwapSchema { public static final String SWAP_SUMMARY = "Swap Summary"; public static final String FLOWFILE_CONTENTS = "FlowFiles"; - static { final RecordField queueIdentifier = new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE); final RecordField flowFileCount = new SimpleRecordField(FLOWFILE_COUNT, FieldType.INT, Repetition.EXACTLY_ONE); @@ -76,4 +78,32 @@ public class SwapSchema { fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields())); FULL_SWAP_FILE_SCHEMA_V1 = new RecordSchema(fullSchemaFields); } + + static { + final RecordField queueIdentifier = new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE); + final RecordField flowFileCount = new SimpleRecordField(FLOWFILE_COUNT, FieldType.INT, Repetition.EXACTLY_ONE); + final RecordField flowFileSize = new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE); + final RecordField maxRecordId = new SimpleRecordField(MAX_RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE); + + final RecordField resourceClaimField = new ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, ContentClaimSchema.RESOURCE_CLAIM_SCHEMA_V1.getFields()); + final RecordField claimCountField = new SimpleRecordField(RESOURCE_CLAIM_COUNT, FieldType.INT, Repetition.EXACTLY_ONE); + final RecordField resourceClaims = new MapRecordField(RESOURCE_CLAIMS, resourceClaimField, claimCountField, Repetition.EXACTLY_ONE); + + final List summaryFields = new ArrayList<>(); + summaryFields.add(queueIdentifier); + summaryFields.add(flowFileCount); + summaryFields.add(flowFileSize); + summaryFields.add(maxRecordId); + summaryFields.add(resourceClaims); + SWAP_SUMMARY_SCHEMA_V2 = new RecordSchema(summaryFields); + + final RecordField flowFiles = new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields()); + final List contentsFields = Collections.singletonList(flowFiles); + SWAP_CONTENTS_SCHEMA_V2 = new RecordSchema(contentsFields); + + final List fullSchemaFields = new ArrayList<>(); + fullSchemaFields.add(new ComplexRecordField(SWAP_SUMMARY, Repetition.EXACTLY_ONE, summaryFields)); + fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields())); + FULL_SWAP_FILE_SCHEMA_V2 = new RecordSchema(fullSchemaFields); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java new file mode 100644 index 0000000000..59b0e7b907 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SchemaRepositoryRecordSerdeTest { + public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier"; + private StandardResourceClaimManager resourceClaimManager; + private SchemaRepositoryRecordSerde schemaRepositoryRecordSerde; + private Map queueMap; + private FlowFileQueue flowFileQueue; + private ByteArrayOutputStream byteArrayOutputStream; + private DataOutputStream dataOutputStream; + + @Before + public void setup() { + resourceClaimManager = new StandardResourceClaimManager(); + schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager); + queueMap = new HashMap<>(); + schemaRepositoryRecordSerde.setQueueMap(queueMap); + flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER); + byteArrayOutputStream = new ByteArrayOutputStream(); + dataOutputStream = new DataOutputStream(byteArrayOutputStream); + } + + @After + public void teardown() { + resourceClaimManager.purge(); + } + + @Test + public void testV1CreateCantHandleLongAttributeName() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream, + RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV1CreateCantHandleLongAttributeValue() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream, + RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2CreateCanHandleLongAttributeName() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2CreateCanHandleLongAttributeValue() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testRoundTripCreateV1ToV2() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + attributes.put("testName", "testValue"); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream, + RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV1SwapInCantHandleLongAttributeName() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV1SwapInCantHandleLongAttributeValue() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2SwapInCanHandleLongAttributeName() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2SwapInCanHandleLongAttributeValue() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testRoundTripSwapInV1ToV2() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + attributes.put("testName", "testValue"); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + assertEquals(SWAP_IN, repositoryRecord.getType()); + } + + private DataInputStream createDataInputStream() throws IOException { + dataOutputStream.flush(); + return new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + } + + private StandardRepositoryRecord createCreateFlowFileRecord(Map attributes) { + StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue); + StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder(); + flowFileRecordBuilder.addAttributes(attributes); + standardRepositoryRecord.setWorking(flowFileRecordBuilder.build()); + return standardRepositoryRecord; + } + + private FlowFileQueue createMockQueue(String identifier) { + FlowFileQueue flowFileQueue = mock(FlowFileQueue.class); + when(flowFileQueue.getIdentifier()).thenReturn(identifier); + queueMap.put(identifier, flowFileQueue); + return flowFileQueue; + } +}