NIFI-3389 - Using long string type for attribute name and value in FlowFileSchema

This closes #1446.
This commit is contained in:
Bryan Rosander 2017-01-25 11:40:05 -05:00 committed by Mark Payne
parent ca887308af
commit 8ffa1703ba
7 changed files with 383 additions and 19 deletions

View File

@ -45,9 +45,9 @@ import org.wali.UpdateType;
public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class);
private static final int MAX_ENCODING_VERSION = 1;
private static final int MAX_ENCODING_VERSION = 2;
private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1;
private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2;
private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
private final ResourceClaimManager resourceClaimManager;
@ -73,25 +73,29 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
switch (record.getType()) {
case CREATE:
case UPDATE:
schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1;
schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V2;
break;
case CONTENTMISSING:
case DELETE:
schema = RepositoryRecordSchema.DELETE_SCHEMA_V1;
schema = RepositoryRecordSchema.DELETE_SCHEMA_V2;
break;
case SWAP_IN:
schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V1;
schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V2;
break;
case SWAP_OUT:
schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V1;
schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V2;
break;
default:
throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen.
}
final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema);
final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
serializeRecord(record, out, schema, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2);
}
protected void serializeRecord(final RepositoryRecord record, final DataOutputStream out, RecordSchema schema, RecordSchema repositoryRecordSchema) throws IOException {
final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema);
final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, repositoryRecordSchema);
new SchemaRecordWriter().writeRecord(update, out);
}
@ -112,7 +116,7 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
// Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the
// top level that indicates which type of record we have.
final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1);
final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2);
final String actionType = (String) record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD);
final UpdateType updateType = UpdateType.valueOf(actionType);

View File

@ -45,6 +45,7 @@ public class FlowFileSchema {
public static final String ATTRIBUTE_VALUE = "Attribute Value";
public static final RecordSchema FLOWFILE_SCHEMA_V1;
public static final RecordSchema FLOWFILE_SCHEMA_V2;
static {
final List<RecordField> flowFileFields = new ArrayList<>();
@ -64,4 +65,23 @@ public class FlowFileSchema {
FLOWFILE_SCHEMA_V1 = new RecordSchema(flowFileFields);
}
static {
final List<RecordField> flowFileFields = new ArrayList<>();
final RecordField attributeNameField = new SimpleRecordField(ATTRIBUTE_NAME, FieldType.LONG_STRING, Repetition.EXACTLY_ONE);
final RecordField attributeValueField = new SimpleRecordField(ATTRIBUTE_VALUE, FieldType.LONG_STRING, Repetition.EXACTLY_ONE);
flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE));
flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, Repetition.EXACTLY_ONE));
flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, FieldType.LONG, Repetition.EXACTLY_ONE));
flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE));
flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE));
flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE));
flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE));
flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, Repetition.ZERO_OR_ONE, ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields()));
flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, attributeValueField, Repetition.ZERO_OR_ONE));
FLOWFILE_SCHEMA_V2 = new RecordSchema(flowFileFields);
}
}

View File

@ -17,10 +17,6 @@
package org.apache.nifi.controller.repository.schema;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.repository.schema.ComplexRecordField;
import org.apache.nifi.repository.schema.FieldType;
import org.apache.nifi.repository.schema.RecordField;
@ -29,9 +25,13 @@ import org.apache.nifi.repository.schema.Repetition;
import org.apache.nifi.repository.schema.SimpleRecordField;
import org.apache.nifi.repository.schema.UnionRecordField;
public class RepositoryRecordSchema {
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class RepositoryRecordSchema {
public static final String REPOSITORY_RECORD_UPDATE_V1 = "Repository Record Update"; // top level field name
public static final String REPOSITORY_RECORD_UPDATE_V2 = "Repository Record Update"; // top level field name
// repository record fields
public static final String ACTION_TYPE = "Action";
@ -51,6 +51,12 @@ public class RepositoryRecordSchema {
public static final RecordSchema SWAP_IN_SCHEMA_V1;
public static final RecordSchema SWAP_OUT_SCHEMA_V1;
public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V2;
public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V2;
public static final RecordSchema DELETE_SCHEMA_V2;
public static final RecordSchema SWAP_IN_SCHEMA_V2;
public static final RecordSchema SWAP_OUT_SCHEMA_V2;
public static final RecordField ACTION_TYPE_FIELD = new SimpleRecordField(ACTION_TYPE, FieldType.STRING, Repetition.EXACTLY_ONE);
public static final RecordField RECORD_ID_FIELD = new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE);
@ -91,4 +97,42 @@ public class RepositoryRecordSchema {
final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V1, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn);
REPOSITORY_RECORD_SCHEMA_V1 = new RecordSchema(Collections.singletonList(repoUpdateField));
}
static {
// Fields for "Create" or "Update" records
final List<RecordField> createOrUpdateFields = new ArrayList<>();
createOrUpdateFields.add(ACTION_TYPE_FIELD);
createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields());
createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE));
final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields);
CREATE_OR_UPDATE_SCHEMA_V2 = new RecordSchema(createOrUpdateFields);
// Fields for "Delete" records
final List<RecordField> deleteFields = new ArrayList<>();
deleteFields.add(ACTION_TYPE_FIELD);
deleteFields.add(RECORD_ID_FIELD);
final ComplexRecordField delete = new ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields);
DELETE_SCHEMA_V2 = new RecordSchema(deleteFields);
// Fields for "Swap Out" records
final List<RecordField> swapOutFields = new ArrayList<>();
swapOutFields.add(ACTION_TYPE_FIELD);
swapOutFields.add(RECORD_ID_FIELD);
swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
final ComplexRecordField swapOut = new ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields);
SWAP_OUT_SCHEMA_V2 = new RecordSchema(swapOutFields);
// Fields for "Swap In" records
final List<RecordField> swapInFields = new ArrayList<>(createOrUpdateFields);
swapInFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
final ComplexRecordField swapIn = new ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields);
SWAP_IN_SCHEMA_V2 = new RecordSchema(swapInFields);
// Union Field that creates the top-level field type
final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V2, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn);
REPOSITORY_RECORD_SCHEMA_V2 = new RecordSchema(Collections.singletonList(repoUpdateField));
}
}

View File

@ -38,7 +38,7 @@ public class RepositoryRecordUpdate implements Record {
@Override
public Object getFieldValue(final String fieldName) {
if (RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1.equals(fieldName)) {
if (RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2.equals(fieldName)) {
final String actionType = (String) fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE);
final UpdateType updateType = UpdateType.valueOf(actionType);

View File

@ -46,7 +46,7 @@ import org.apache.nifi.repository.schema.SimpleRecordField;
public class SchemaSwapSerializer implements SwapSerializer {
static final String SERIALIZATION_NAME = "Schema Swap Serialization";
private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V1;
private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V2;
private final RecordSchema flowFileSchema = new RecordSchema(schema.getField(SwapSchema.FLOWFILE_CONTENTS).getSubFields());
@Override
@ -78,7 +78,7 @@ public class SchemaSwapSerializer implements SwapSerializer {
// Create a simple record to hold the summary and the flowfile contents
final RecordField summaryField = new SimpleRecordField(SwapSchema.SWAP_SUMMARY, FieldType.COMPLEX, Repetition.EXACTLY_ONE);
final RecordField contentsField = new ComplexRecordField(SwapSchema.FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
final RecordField contentsField = new ComplexRecordField(SwapSchema.FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields());
final List<RecordField> fields = new ArrayList<>(2);
fields.add(summaryField);
fields.add(contentsField);

View File

@ -32,11 +32,14 @@ import org.apache.nifi.repository.schema.Repetition;
import org.apache.nifi.repository.schema.SimpleRecordField;
public class SwapSchema {
public static final RecordSchema SWAP_SUMMARY_SCHEMA_V1;
public static final RecordSchema SWAP_CONTENTS_SCHEMA_V1;
public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V1;
public static final RecordSchema SWAP_SUMMARY_SCHEMA_V2;
public static final RecordSchema SWAP_CONTENTS_SCHEMA_V2;
public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V2;
public static final String RESOURCE_CLAIMS = "Resource Claims";
public static final String RESOURCE_CLAIM = "Resource Claim";
public static final String RESOURCE_CLAIM_COUNT = "Claim Count";
@ -48,7 +51,6 @@ public class SwapSchema {
public static final String SWAP_SUMMARY = "Swap Summary";
public static final String FLOWFILE_CONTENTS = "FlowFiles";
static {
final RecordField queueIdentifier = new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE);
final RecordField flowFileCount = new SimpleRecordField(FLOWFILE_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
@ -76,4 +78,32 @@ public class SwapSchema {
fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields()));
FULL_SWAP_FILE_SCHEMA_V1 = new RecordSchema(fullSchemaFields);
}
static {
final RecordField queueIdentifier = new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE);
final RecordField flowFileCount = new SimpleRecordField(FLOWFILE_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
final RecordField flowFileSize = new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE);
final RecordField maxRecordId = new SimpleRecordField(MAX_RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE);
final RecordField resourceClaimField = new ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, ContentClaimSchema.RESOURCE_CLAIM_SCHEMA_V1.getFields());
final RecordField claimCountField = new SimpleRecordField(RESOURCE_CLAIM_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
final RecordField resourceClaims = new MapRecordField(RESOURCE_CLAIMS, resourceClaimField, claimCountField, Repetition.EXACTLY_ONE);
final List<RecordField> summaryFields = new ArrayList<>();
summaryFields.add(queueIdentifier);
summaryFields.add(flowFileCount);
summaryFields.add(flowFileSize);
summaryFields.add(maxRecordId);
summaryFields.add(resourceClaims);
SWAP_SUMMARY_SCHEMA_V2 = new RecordSchema(summaryFields);
final RecordField flowFiles = new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields());
final List<RecordField> contentsFields = Collections.singletonList(flowFiles);
SWAP_CONTENTS_SCHEMA_V2 = new RecordSchema(contentsFields);
final List<RecordField> fullSchemaFields = new ArrayList<>();
fullSchemaFields.add(new ComplexRecordField(SWAP_SUMMARY, Repetition.EXACTLY_ONE, summaryFields));
fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields()));
FULL_SWAP_FILE_SCHEMA_V2 = new RecordSchema(fullSchemaFields);
}
}

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SchemaRepositoryRecordSerdeTest {
public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier";
private StandardResourceClaimManager resourceClaimManager;
private SchemaRepositoryRecordSerde schemaRepositoryRecordSerde;
private Map<String, FlowFileQueue> queueMap;
private FlowFileQueue flowFileQueue;
private ByteArrayOutputStream byteArrayOutputStream;
private DataOutputStream dataOutputStream;
@Before
public void setup() {
resourceClaimManager = new StandardResourceClaimManager();
schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager);
queueMap = new HashMap<>();
schemaRepositoryRecordSerde.setQueueMap(queueMap);
flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER);
byteArrayOutputStream = new ByteArrayOutputStream();
dataOutputStream = new DataOutputStream(byteArrayOutputStream);
}
@After
public void teardown() {
resourceClaimManager.purge();
}
@Test
public void testV1CreateCantHandleLongAttributeName() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put(stringBuilder.toString(), "testValue");
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV1CreateCantHandleLongAttributeValue() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put("testName", stringBuilder.toString());
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV2CreateCanHandleLongAttributeName() throws IOException {
schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put(stringBuilder.toString(), "testValue");
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV2CreateCanHandleLongAttributeValue() throws IOException {
schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put("testName", stringBuilder.toString());
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testRoundTripCreateV1ToV2() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
attributes.put("testName", "testValue");
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV1SwapInCantHandleLongAttributeName() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put(stringBuilder.toString(), "testValue");
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV1SwapInCantHandleLongAttributeValue() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put("testName", stringBuilder.toString());
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV2SwapInCanHandleLongAttributeName() throws IOException {
schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put(stringBuilder.toString(), "testValue");
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV2SwapInCanHandleLongAttributeValue() throws IOException {
schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put("testName", stringBuilder.toString());
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testRoundTripSwapInV1ToV2() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
attributes.put("testName", "testValue");
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
assertEquals(SWAP_IN, repositoryRecord.getType());
}
private DataInputStream createDataInputStream() throws IOException {
dataOutputStream.flush();
return new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
}
private StandardRepositoryRecord createCreateFlowFileRecord(Map<String, String> attributes) {
StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue);
StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder();
flowFileRecordBuilder.addAttributes(attributes);
standardRepositoryRecord.setWorking(flowFileRecordBuilder.build());
return standardRepositoryRecord;
}
private FlowFileQueue createMockQueue(String identifier) {
FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn(identifier);
queueMap.put(identifier, flowFileQueue);
return flowFileQueue;
}
}