diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml new file mode 100644 index 0000000000..02988c0d71 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml @@ -0,0 +1,44 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-framework + 1.2.0-SNAPSHOT + + nifi-flowfile-repo-serialization + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-framework-api + + + org.apache.nifi + nifi-repository-models + + + org.apache.nifi + nifi-write-ahead-log + + + org.apache.nifi + nifi-schema-utils + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java new file mode 100644 index 0000000000..59b0e7b907 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SchemaRepositoryRecordSerdeTest { + public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier"; + private StandardResourceClaimManager resourceClaimManager; + private SchemaRepositoryRecordSerde schemaRepositoryRecordSerde; + private Map queueMap; + private FlowFileQueue flowFileQueue; + private ByteArrayOutputStream byteArrayOutputStream; + private DataOutputStream dataOutputStream; + + @Before + public void setup() { + resourceClaimManager = new StandardResourceClaimManager(); + schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager); + queueMap = new HashMap<>(); + schemaRepositoryRecordSerde.setQueueMap(queueMap); + flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER); + byteArrayOutputStream = new ByteArrayOutputStream(); + dataOutputStream = new DataOutputStream(byteArrayOutputStream); + } + + @After + public void teardown() { + resourceClaimManager.purge(); + } + + @Test + public void testV1CreateCantHandleLongAttributeName() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream, + RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV1CreateCantHandleLongAttributeValue() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream, + RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2CreateCanHandleLongAttributeName() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2CreateCanHandleLongAttributeValue() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testRoundTripCreateV1ToV2() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + attributes.put("testName", "testValue"); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream, + RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV1SwapInCantHandleLongAttributeName() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV1SwapInCantHandleLongAttributeValue() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2SwapInCanHandleLongAttributeName() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2SwapInCanHandleLongAttributeValue() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testRoundTripSwapInV1ToV2() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map attributes = new HashMap<>(); + attributes.put("testName", "testValue"); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + assertEquals(SWAP_IN, repositoryRecord.getType()); + } + + private DataInputStream createDataInputStream() throws IOException { + dataOutputStream.flush(); + return new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + } + + private StandardRepositoryRecord createCreateFlowFileRecord(Map attributes) { + StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue); + StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder(); + flowFileRecordBuilder.addAttributes(attributes); + standardRepositoryRecord.setWorking(flowFileRecordBuilder.build()); + return standardRepositoryRecord; + } + + private FlowFileQueue createMockQueue(String identifier) { + FlowFileQueue flowFileQueue = mock(FlowFileQueue.class); + when(flowFileQueue.getIdentifier()).thenReturn(identifier); + queueMap.put(identifier, flowFileQueue); + return flowFileQueue; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 54d777ff6f..6395e6e4de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -55,6 +55,10 @@ org.apache.nifi nifi-schema-utils + + org.apache.nifi + nifi-repository-models + org.apache.nifi nifi-properties @@ -135,6 +139,10 @@ org.apache.nifi nifi-write-ahead-log + + org.apache.nifi + nifi-flowfile-repo-serialization + org.apache.zookeeper zookeeper diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/pom.xml new file mode 100644 index 0000000000..519b95a9f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/pom.xml @@ -0,0 +1,44 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-framework + 1.2.0-SNAPSHOT + + nifi-repository-models + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-framework-api + + + org.apache.nifi + nifi-schema-utils + + + org.apache.commons + commons-lang3 + + + org.apache.nifi + nifi-utils + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java similarity index 100% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml index 45494d4889..e1853320e3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml @@ -29,6 +29,8 @@ nifi-runtime nifi-security nifi-site-to-site + nifi-repository-models + nifi-flowfile-repo-serialization nifi-framework-core nifi-framework-cluster-protocol nifi-framework-cluster diff --git a/nifi-toolkit/nifi-toolkit-flowfile-repo/pom.xml b/nifi-toolkit/nifi-toolkit-flowfile-repo/pom.xml new file mode 100644 index 0000000000..946b195d58 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-flowfile-repo/pom.xml @@ -0,0 +1,27 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-toolkit + 1.2.0-SNAPSHOT + + nifi-toolkit-flowfile-repo + + + org.apache.nifi + nifi-utils + + + diff --git a/nifi-toolkit/nifi-toolkit-flowfile-repo/src/main/java/org/apache/nifi/toolkit/repos/flowfile/RepairCorruptedFileEndings.java b/nifi-toolkit/nifi-toolkit-flowfile-repo/src/main/java/org/apache/nifi/toolkit/repos/flowfile/RepairCorruptedFileEndings.java new file mode 100644 index 0000000000..d911a9d3a1 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-flowfile-repo/src/main/java/org/apache/nifi/toolkit/repos/flowfile/RepairCorruptedFileEndings.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.repos.flowfile; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.StreamUtils; + +public class RepairCorruptedFileEndings { + private static final Pattern PARTITION_FILE_PATTERN = Pattern.compile("partition\\-\\d+"); + + private static void printUsage() { + System.out.println("Whenever a sudden power loss occurs, it is common with some operating systems for files that are being written to "); + System.out.println("to contain many NUL characters (hex 0) at the end of the file upon restart. If this happens to the FlowFile repository, "); + System.out.println("NiFi will be unable to recover, because it cannot properly read the repository. This utility attempts to read the FlowFile "); + System.out.println("Repository and write out a new copy of the repository, where the new copy does not contain the trailing NUL characters so "); + System.out.println("NiFi can be restarted by pointing at the new FlowFile Repository."); + System.out.println("Typically, this problem can be identified by seeing an error in the NiFi logs at startup, indicating either:"); + System.out.println(); + System.out.println("Caused by: java.io.IOException: Expected to read a Sentinel Byte of '1' but got a value of '0' instead"); + System.out.println(); + System.out.println("or:"); + System.out.println(); + System.out.println("Caused by: java.lang.IllegalArgumentException: No enum constant org.wali.UpdateType."); + System.out.println(); + System.out.println(); + System.out.println("Usage:"); + System.out.println("java " + RepairCorruptedFileEndings.class.getCanonicalName() + " "); + System.out.println(); + System.out.println(": The existing FlowFile Repository Directory that contains corrupt data"); + System.out.println(": The directory to write the repaired repository to"); + System.out.println(); + } + + public static void main(final String[] args) { + if (args.length != 2) { + printUsage(); + return; + } + + final File inputDir = new File(args[0]); + if (!inputDir.exists()) { + System.out.println("Input Repository Directory " + inputDir + " does not exist"); + return; + } + + final File[] inputFiles = inputDir.listFiles(); + if (inputFiles == null) { + System.out.println("Could not access files within input Repository Directory " + inputDir); + return; + } + + final List partitionDirs = Stream.of(inputFiles) + .filter(RepairCorruptedFileEndings::isPartitionDirectory) + .collect(Collectors.toList()); + + if (partitionDirs.isEmpty()) { + System.out.println("Found no partitions within input Repository Directory " + inputDir); + return; + } + + final File outputDir = new File(args[1]); + if (outputDir.exists()) { + final File[] children = outputDir.listFiles(); + if (children == null) { + System.out.println("Cannot access output Repository Directory " + outputDir); + return; + } + + if (children.length > 0) { + System.out.println("Output Repository Directory " + outputDir + " already exists and has files or sub-directories. " + + "The output directory must either not exist or be empty."); + return; + } + } else if (!outputDir.mkdirs()) { + System.out.println("Failed to create output Repository Directory " + outputDir); + return; + } + + final List nonPartitionDirFiles = Stream.of(inputFiles) + .filter(f -> !isPartitionDirectory(f)) + .filter(f -> !f.getName().equals("wali.lock")) + .collect(Collectors.toList()); + + for (final File nonPartitionFile : nonPartitionDirFiles) { + final File destination = new File(outputDir, nonPartitionFile.getName()); + try { + copy(nonPartitionFile, destination); + } catch (final IOException e) { + System.out.println("Failed to copy source file " + nonPartitionFile + " to destination file " + destination); + e.printStackTrace(); + } + } + + int fullCopies = 0; + int partialCopies = 0; + + for (final File partitionDir : partitionDirs) { + final File[] partitionFiles = partitionDir.listFiles(); + if (partitionFiles == null) { + System.out.println("Could not access children of input sub-directory " + partitionDir); + return; + } + + final File outputPartitionDir = new File(outputDir, partitionDir.getName()); + if (!outputPartitionDir.mkdirs()) { + System.out.println("Failed to created output directory " + outputPartitionDir); + return; + } + + for (final File partitionFile : partitionFiles) { + final File destinationFile = new File(outputPartitionDir, partitionFile.getName()); + + // All journal files follow the pattern of: + // ... + // The TRANSACTION_CONTINUE byte is a 1 while the TRANSACTION_COMMIT byte is a 2. So if we have 0's at the end then we know + // that we can simply truncate up until the point where we encounter the first of the of the trailing zeroes. At that point, + // we know that we are done. It is possible that the repo will still be 'corrupt' in that only part of a transaction was + // written out. However, this is okay because the repo will recover from this on restart. What it does NOT properly recover + // from on restart is when the file ends with a bunch of 0's because it believes that the Transaction ID is zero and then + // it reads in 0 bytes for the "Update Type" and as a result we get an invalid enum name because it thinks that the name of + // the UpdateType is an empty string because it's a string of length 0. + final int trailingZeroes; + try { + trailingZeroes = countTrailingZeroes(partitionFile); + } catch (final Exception e) { + System.out.println("Failed to read input file " + partitionFile); + e.printStackTrace(); + return; + } + + if (trailingZeroes > 0) { + final long goodLength = partitionFile.length() - trailingZeroes; + + try { + copy(partitionFile, destinationFile, goodLength); + partialCopies++; + } catch (final Exception e) { + System.out.println("Failed to copy " + goodLength + " bytes from " + partitionFile + " to " + destinationFile); + e.printStackTrace(); + return; + } + } else { + try { + copy(partitionFile, destinationFile); + } catch (final Exception e) { + System.out.println("Failed to copy entire file from " + partitionFile + " to " + destinationFile); + e.printStackTrace(); + return; + } + + fullCopies++; + } + } + } + + System.out.println("Successfully copied " + fullCopies + " journal files fully and truncated " + partialCopies + " journal files in output directory"); + } + + private static boolean isPartitionDirectory(final File file) { + return PARTITION_FILE_PATTERN.matcher(file.getName()).matches(); + } + + private static void copy(final File input, final File destination) throws IOException { + if (input.isFile()) { + copyFile(input, destination); + return; + } else { + copyDirectory(input, destination); + } + } + + private static void copyDirectory(final File input, final File destination) throws IOException { + if (!destination.exists() && !destination.mkdirs()) { + System.out.println("Failed to copy input directory " + input + " to destination because destination directory " + destination + + " does not exist and could not be created"); + return; + } + + final File[] children = input.listFiles(); + if (children == null) { + System.out.println("Failed to copy input directory " + input + " to destination because could not access files of input directory"); + return; + } + + for (final File child : children) { + final File destinationChild = new File(destination, child.getName()); + copy(child, destinationChild); + } + } + + private static void copyFile(final File input, final File destination) throws IOException { + if (!input.exists()) { + return; + } + + Files.copy(input.toPath(), destination.toPath(), StandardCopyOption.COPY_ATTRIBUTES); + } + + private static void copy(final File input, final File destination, final long length) throws IOException { + try (final InputStream fis = new FileInputStream(input); + final LimitingInputStream in = new LimitingInputStream(fis, length); + final OutputStream fos = new FileOutputStream(destination)) { + StreamUtils.copy(in, fos); + } + } + + static int countTrailingZeroes(final File partitionFile) throws IOException { + final RandomAccessFile raf = new RandomAccessFile(partitionFile, "r"); + + long startPos = partitionFile.length() - 4096; + + int count = 0; + boolean reachedStartOfFile = false; + while (!reachedStartOfFile) { + int bufferLength = 4096; + + if (startPos < 0) { + bufferLength = (int) (startPos + 4096); + startPos = 0; + reachedStartOfFile = true; + } + + raf.seek(startPos); + + final byte[] buffer = new byte[bufferLength]; + final int read = fillBuffer(raf, buffer); + + for (int i = read - 1; i >= 0; i--) { + final byte b = buffer[i]; + if (b == 0) { + count++; + } else { + return count; + } + } + + startPos -= 4096; + } + + return count; + } + + + private static int fillBuffer(final RandomAccessFile source, final byte[] destination) throws IOException { + int bytesRead = 0; + int len; + while (bytesRead < destination.length) { + len = source.read(destination, bytesRead, destination.length - bytesRead); + if (len < 0) { + break; + } + + bytesRead += len; + } + + return bytesRead; + } +} diff --git a/nifi-toolkit/nifi-toolkit-flowfile-repo/src/test/java/org/apache/nifi/toolkit/repos/flowfile/TestRepairCorruptedFileEndings.java b/nifi-toolkit/nifi-toolkit-flowfile-repo/src/test/java/org/apache/nifi/toolkit/repos/flowfile/TestRepairCorruptedFileEndings.java new file mode 100644 index 0000000000..7f8a7a1c47 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-flowfile-repo/src/test/java/org/apache/nifi/toolkit/repos/flowfile/TestRepairCorruptedFileEndings.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.repos.flowfile; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; + +import org.junit.Test; + +public class TestRepairCorruptedFileEndings { + private final File targetFile = new File("target/1.bin"); + + @Before + @After + public void cleanup() { + if (targetFile.exists()) { + Assert.assertTrue(targetFile.delete()); + } + } + + @Test + public void testEndsWithZeroesGreaterThanBufferSize() throws IOException { + final byte[] data = new byte[4096 + 8]; + for (int i=0; i < 4096; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(8, zeroCount); + } + + @Test + public void testEndsWithZeroesSmallerThanBufferSize() throws IOException { + final byte[] data = new byte[1024]; + for (int i = 0; i < 1020; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(4, zeroCount); + } + + @Test + public void testEndsWithZeroesEqualToBufferSize() throws IOException { + final byte[] data = new byte[4096]; + for (int i = 0; i < 4090; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(6, zeroCount); + } + + + @Test + public void testAllZeroesGreaterThanBufferSize() throws IOException { + final byte[] data = new byte[4096 + 8]; + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(4096 + 8, zeroCount); + } + + @Test + public void testAllZeroesEqualToBufferSize() throws IOException { + final byte[] data = new byte[4096]; + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(4096, zeroCount); + } + + @Test + public void testAllZeroesSmallerThanBufferSize() throws IOException { + final byte[] data = new byte[1024]; + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(1024, zeroCount); + } + + + @Test + public void testSmallerThanBufferSize() throws IOException { + final byte[] data = new byte[1024]; + for (int i = 0; i < 1020; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(4, zeroCount); + } + + @Test + public void testSmallerThanBufferSizeNoTrailingZeroes() throws IOException { + final byte[] data = new byte[1024]; + for (int i = 0; i < 1024; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(0, zeroCount); + } + + + @Test + public void testLargerThanBufferSizeNoTrailingZeroes() throws IOException { + final byte[] data = new byte[8192]; + for (int i = 0; i < 8192; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(0, zeroCount); + } + + + @Test + public void testEqualToBufferSizeNoTrailingZeroes() throws IOException { + final byte[] data = new byte[4096]; + for (int i = 0; i < 4096; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(0, zeroCount); + } + +} diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml index b98325d86c..75661c7884 100644 --- a/nifi-toolkit/pom.xml +++ b/nifi-toolkit/pom.xml @@ -27,6 +27,7 @@ nifi-toolkit-encrypt-config nifi-toolkit-s2s nifi-toolkit-zookeeper-migrator + nifi-toolkit-flowfile-repo nifi-toolkit-assembly diff --git a/pom.xml b/pom.xml index 660977d40b..a735e13ea3 100644 --- a/pom.xml +++ b/pom.xml @@ -1,15 +1,16 @@ - + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + 4.0.0 org.apache @@ -87,7 +88,7 @@ language governing permissions and limitations under the License. --> 1.8 1.8 3.1.0 - + UTF-8 UTF-8 2014 @@ -330,9 +331,9 @@ language governing permissions and limitations under the License. --> quartz 2.2.1 - + c3p0 c3p0 @@ -401,8 +402,8 @@ language governing permissions and limitations under the License. --> spring-core ${spring.version} - + commons-logging commons-logging @@ -818,7 +819,7 @@ language governing permissions and limitations under the License. --> ${org.slf4j.version} - + org.apache.curator @@ -832,9 +833,9 @@ language governing permissions and limitations under the License. --> 6.8.8 test - - - + + + org.jsoup jsoup 1.8.3 @@ -870,6 +871,16 @@ language governing permissions and limitations under the License. --> nifi-expression-language 1.2.0-SNAPSHOT + + org.apache.nifi + nifi-flowfile-repo-serialization + 1.2.0-SNAPSHOT + + + org.apache.nifi + nifi-repository-models + 1.2.0-SNAPSHOT + org.apache.nifi nifi-custom-ui-utilities @@ -1253,37 +1264,37 @@ language governing permissions and limitations under the License. --> nifi-elasticsearch-nar 1.2.0-SNAPSHOT nar - - + + org.apache.nifi nifi-elasticsearch-5-nar - 1.2.0-SNAPSHOT + 1.2.0-SNAPSHOT nar - - + + org.apache.nifi nifi-lumberjack-nar 1.2.0-SNAPSHOT nar - + org.apache.nifi nifi-beats-nar 1.2.0-SNAPSHOT nar - + org.apache.nifi nifi-email-nar 1.2.0-SNAPSHOT nar - - org.apache.nifi - nifi-tcp-nar - 1.2.0-SNAPSHOT - nar - + + org.apache.nifi + nifi-tcp-nar + 1.2.0-SNAPSHOT + nar + org.apache.nifi nifi-splunk-nar @@ -1308,7 +1319,7 @@ language governing permissions and limitations under the License. --> 1.2.0-SNAPSHOT nar - + org.apache.nifi nifi-site-to-site-reporting-nar 1.2.0-SNAPSHOT @@ -1600,7 +1611,9 @@ language governing permissions and limitations under the License. --> **/*Spec.class true - -Xmx1G -Djava.net.preferIPv4Stack=true ${maven.surefire.arguments} + -Xmx1G + -Djava.net.preferIPv4Stack=true + ${maven.surefire.arguments} @@ -1804,71 +1817,100 @@ language governing permissions and limitations under the License. --> - + - - + + - - + + - + - + - + - + - - + + - - + + - - + + - + - + - - + + - + - - + + - - - - - - + + + + + + @@ -1908,9 +1950,10 @@ language governing permissions and limitations under the License. --> - + integration-tests @@ -1930,12 +1973,12 @@ language governing permissions and limitations under the License. --> - + contrib-check @@ -1991,14 +2034,16 @@ language governing permissions and limitations under the License. --> - + - + hortonworks @@ -2033,15 +2078,13 @@ language governing permissions and limitations under the License. --> - + - + mapr @@ -2057,15 +2100,13 @@ language governing permissions and limitations under the License. --> - + - + cloudera @@ -2081,10 +2122,8 @@ language governing permissions and limitations under the License. --> - +