From 39cfd0339756e3869d0e3f3bcf45569855d6bca3 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 16 Dec 2016 09:01:01 -0500 Subject: [PATCH] NIFI-3205: This closes #1336. Ensure that FlowFile Repository is updated with any Transient Content Claims when session rollback occurs --- .../repository/RepositoryRecordType.java | 2 +- .../org/apache/nifi/util/MockFlowFile.java | 14 ++- .../repository/StandardFlowFileRecord.java | 6 +- .../repository/StandardProcessSession.java | 19 +++- .../TransientClaimRepositoryRecord.java | 89 +++++++++++++++++++ .../WriteAheadFlowFileRepository.java | 17 +++- .../TestStandardProcessSession.java | 77 ++++++++++++++++ 7 files changed, 213 insertions(+), 11 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java index ff8dc504fa..50221bba6a 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java @@ -21,5 +21,5 @@ package org.apache.nifi.controller.repository; */ public enum RepositoryRecordType { - UPDATE, CREATE, DELETE, CONTENTMISSING, SWAP_IN, SWAP_OUT; + UPDATE, CREATE, DELETE, CONTENTMISSING, SWAP_IN, SWAP_OUT, CLEANUP_TRANSIENT_CLAIMS; } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index 9848a3d856..1ff1a2ffc8 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -33,6 +33,7 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.flowfile.FlowFile; @@ -142,7 +143,7 @@ public class MockFlowFile implements FlowFileRecord { @Override public int hashCode() { - return (int) id; + return new HashCodeBuilder(7, 13).append(id).toHashCode(); } @Override @@ -150,8 +151,11 @@ public class MockFlowFile implements FlowFileRecord { if (obj == null) { return false; } - if (obj instanceof MockFlowFile) { - return ((MockFlowFile) obj).id == this.id; + if (obj == this) { + return true; + } + if (obj instanceof FlowFile) { + return ((FlowFile) obj).getId() == this.id; } return false; } @@ -291,10 +295,12 @@ public class MockFlowFile implements FlowFileRecord { public long getQueueDateIndex() { return 0; } + public boolean isAttributeEqual(final String attributeName, final String expectedValue) { // unknown attribute name, so cannot be equal. - if (attributes.containsKey(attributeName) == false) + if (attributes.containsKey(attributeName) == false) { return false; + } String value = attributes.get(attributeName); return Objects.equals(expectedValue, value); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java index 607ccfd470..088f26dbd1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java @@ -145,11 +145,11 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { if (this == other) { return true; } - if (!(other instanceof StandardFlowFileRecord)) { + if (!(other instanceof FlowFile)) { return false; } - final StandardFlowFileRecord otherRecord = (StandardFlowFileRecord) other; - return new EqualsBuilder().append(id, otherRecord.id).isEquals(); + final FlowFile otherRecord = (FlowFile) other; + return new EqualsBuilder().append(id, otherRecord.getId()).isEquals(); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index f7dfd7330f..54987b9a8e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -47,13 +47,13 @@ import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.StandardProvenanceEventRecord; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; @@ -973,6 +973,23 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + // If we have transient claims that need to be cleaned up, do so. + final List transientClaims = recordsToHandle.stream() + .flatMap(record -> record.getTransientClaims().stream()) + .collect(Collectors.toList()); + + if (!transientClaims.isEmpty()) { + final RepositoryRecord repoRecord = new TransientClaimRepositoryRecord(transientClaims); + try { + context.getFlowFileRepository().updateRepository(Collections.singletonList(repoRecord)); + } catch (final IOException ioe) { + LOG.error("Unable to update FlowFile repository to cleanup transient claims due to {}", ioe.toString()); + if (LOG.isDebugEnabled()) { + LOG.error("", ioe); + } + } + } + final Connectable connectable = context.getConnectable(); final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier()); flowFileEvent.setBytesRead(bytesRead); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java new file mode 100644 index 0000000000..8cf6952527 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.util.List; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; + +/** + * A simple RepositoryRecord that represents a Set of Content Claims that need to be cleaned up + */ +public class TransientClaimRepositoryRecord implements RepositoryRecord { + private final List claimsToCleanUp; + + public TransientClaimRepositoryRecord(final List claimsToCleanUp) { + this.claimsToCleanUp = claimsToCleanUp; + } + + @Override + public FlowFileQueue getDestination() { + return null; + } + + @Override + public FlowFileQueue getOriginalQueue() { + return null; + } + + @Override + public RepositoryRecordType getType() { + return RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS; + } + + @Override + public ContentClaim getCurrentClaim() { + return null; + } + + @Override + public ContentClaim getOriginalClaim() { + return null; + } + + @Override + public long getCurrentClaimOffset() { + return 0; + } + + @Override + public FlowFileRecord getCurrent() { + return null; + } + + @Override + public boolean isAttributesChanged() { + return false; + } + + @Override + public boolean isMarkedForAbort() { + return false; + } + + @Override + public String getSwapLocation() { + return null; + } + + @Override + public List getTransientClaims() { + return claimsToCleanUp; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 071527ca1f..b5807ca213 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -21,6 +21,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -35,6 +36,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; @@ -201,13 +203,24 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private void updateRepository(final Collection records, final boolean sync) throws IOException { for (final RepositoryRecord record : records) { - if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING && record.getDestination() == null) { + if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING + && record.getType() != RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS && record.getDestination() == null) { throw new IllegalArgumentException("Record " + record + " has no destination and Type is " + record.getType()); } } + // Partition records by whether or not their type is 'CLEANUP_TRANSIENT_CLAIMS'. We do this because we don't want to send + // these types of records to the Write-Ahead Log. + final Map> partitionedRecords = records.stream() + .collect(Collectors.partitioningBy(record -> record.getType() == RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS)); + + List recordsForWal = partitionedRecords.get(Boolean.FALSE); + if (recordsForWal == null) { + recordsForWal = Collections.emptyList(); + } + // update the repository. - final int partitionIndex = wal.update(records, sync); + final int partitionIndex = wal.update(recordsForWal, sync); // The below code is not entirely thread-safe, but we are OK with that because the results aren't really harmful. // Specifically, if two different threads call updateRepository with DELETE records for the same Content Claim, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 6f94994ef2..9070d0c408 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -81,6 +81,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; import org.junit.After; import org.junit.Assert; @@ -1488,10 +1489,81 @@ public class TestStandardProcessSession { session.commit(); } + @Test + public void testNewFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() { + FlowFile flowFile = session.create(); + for (int i = 0; i < 5; i++) { + final byte[] content = String.valueOf(i).getBytes(); + flowFile = session.write(flowFile, out -> out.write(content)); + } + + session.transfer(flowFile, new Relationship.Builder().name("success").build()); + session.commit(); + + final List repoUpdates = flowFileRepo.getUpdates(); + assertEquals(1, repoUpdates.size()); + + // Should be 4 transient claims because it was written to 5 times. So 4 transient + 1 actual claim. + final RepositoryRecord record = repoUpdates.get(0); + assertEquals(RepositoryRecordType.CREATE, record.getType()); + final List transientClaims = record.getTransientClaims(); + assertEquals(4, transientClaims.size()); + } + + + @Test + public void testUpdateFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() { + flowFileQueue.put(new MockFlowFile(1L)); + + FlowFile flowFile = session.get(); + for (int i = 0; i < 5; i++) { + final byte[] content = String.valueOf(i).getBytes(); + flowFile = session.write(flowFile, out -> out.write(content)); + } + + session.transfer(flowFile, new Relationship.Builder().name("success").build()); + session.commit(); + + final List repoUpdates = flowFileRepo.getUpdates(); + assertEquals(1, repoUpdates.size()); + + // Should be 4 transient claims because it was written to 5 times. So 4 transient + 1 actual claim. + final RepositoryRecord record = repoUpdates.get(0); + assertEquals(RepositoryRecordType.UPDATE, record.getType()); + final List transientClaims = record.getTransientClaims(); + assertEquals(4, transientClaims.size()); + } + + + @Test + public void testUpdateFlowFileModifiedMultipleTimesHasTransientClaimsOnRollback() { + flowFileQueue.put(new MockFlowFile(1L)); + + FlowFile flowFile = session.get(); + for (int i = 0; i < 5; i++) { + final byte[] content = String.valueOf(i).getBytes(); + flowFile = session.write(flowFile, out -> out.write(content)); + } + + session.rollback(); + + final List repoUpdates = flowFileRepo.getUpdates(); + assertEquals(1, repoUpdates.size()); + + // Should be 5 transient claims because it was written to 5 times and then rolled back so all + // content claims are 'transient'. + final RepositoryRecord record = repoUpdates.get(0); + assertEquals(RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS, record.getType()); + final List transientClaims = record.getTransientClaims(); + assertEquals(5, transientClaims.size()); + } + + private static class MockFlowFileRepository implements FlowFileRepository { private boolean failOnUpdate = false; private final AtomicLong idGenerator = new AtomicLong(0L); + private final List updates = new ArrayList<>(); public void setFailOnUpdate(final boolean fail) { this.failOnUpdate = fail; @@ -1516,6 +1588,11 @@ public class TestStandardProcessSession { if (failOnUpdate) { throw new IOException("FlowFile Repository told to fail on update for unit test"); } + updates.addAll(records); + } + + public List getUpdates() { + return updates; } @Override